summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/codecs.py71
-rw-r--r--Lib/test/test_codecs.py93
2 files changed, 121 insertions, 43 deletions
diff --git a/Lib/codecs.py b/Lib/codecs.py
index ab12237f88..f6d480cc98 100644
--- a/Lib/codecs.py
+++ b/Lib/codecs.py
@@ -230,6 +230,7 @@ class StreamReader(Codec):
self.errors = errors
self.bytebuffer = ""
self.charbuffer = u""
+ self.atcr = False
def decode(self, input, errors='strict'):
raise NotImplementedError
@@ -256,41 +257,39 @@ class StreamReader(Codec):
definition of the encoding and the given size, e.g. if
optional encoding endings or state markers are available
on the stream, these should be read too.
-
"""
# read until we get the required number of characters (if available)
- done = False
while True:
# can the request can be satisfied from the character buffer?
if chars < 0:
if self.charbuffer:
- done = True
+ break
else:
if len(self.charbuffer) >= chars:
- done = True
- if done:
- if chars < 0:
- result = self.charbuffer
- self.charbuffer = u""
- break
- else:
- result = self.charbuffer[:chars]
- self.charbuffer = self.charbuffer[chars:]
break
# we need more data
if size < 0:
newdata = self.stream.read()
else:
newdata = self.stream.read(size)
+ # decode bytes (those remaining from the last call included)
data = self.bytebuffer + newdata
- object, decodedbytes = self.decode(data, self.errors)
+ newchars, decodedbytes = self.decode(data, self.errors)
# keep undecoded bytes until the next call
self.bytebuffer = data[decodedbytes:]
# put new characters in the character buffer
- self.charbuffer += object
+ self.charbuffer += newchars
# there was no data available
if not newdata:
- done = True
+ break
+ if chars < 0:
+ # Return everything we've got
+ result = self.charbuffer
+ self.charbuffer = u""
+ else:
+ # Return the first chars characters
+ result = self.charbuffer[:chars]
+ self.charbuffer = self.charbuffer[chars:]
return result
def readline(self, size=None, keepends=True):
@@ -302,24 +301,36 @@ class StreamReader(Codec):
read() method.
"""
- if size is None:
- size = 10
+ readsize = size or 72
line = u""
+ # If size is given, we call read() only once
while True:
- data = self.read(size)
+ data = self.read(readsize)
+ if self.atcr and data.startswith(u"\n"):
+ data = data[1:]
+ if data:
+ self.atcr = data.endswith(u"\r")
line += data
- pos = line.find("\n")
- if pos>=0:
- self.charbuffer = line[pos+1:] + self.charbuffer
- if keepends:
- line = line[:pos+1]
- else:
- line = line[:pos]
- return line
- elif not data:
- return line
- if size<8000:
- size *= 2
+ lines = line.splitlines(True)
+ if lines:
+ line0withend = lines[0]
+ line0withoutend = lines[0].splitlines(False)[0]
+ if line0withend != line0withoutend: # We really have a line end
+ # Put the rest back together and keep it until the next call
+ self.charbuffer = u"".join(lines[1:]) + self.charbuffer
+ if keepends:
+ line = line0withend
+ else:
+ line = line0withoutend
+ break
+ # we didn't get anything or this was our only try
+ elif not data or size is not None:
+ if line and not keepends:
+ line = line.splitlines(False)[0]
+ break
+ if readsize<8000:
+ readsize *= 2
+ return line
def readlines(self, sizehint=None, keepends=True):
diff --git a/Lib/test/test_codecs.py b/Lib/test/test_codecs.py
index 21ae46798c..36c4040177 100644
--- a/Lib/test/test_codecs.py
+++ b/Lib/test/test_codecs.py
@@ -23,16 +23,16 @@ class Queue(object):
self._buffer = self._buffer[size:]
return s
-class PartialReadTest(unittest.TestCase):
- def check_partial(self, encoding, input, partialresults):
+class ReadTest(unittest.TestCase):
+ def check_partial(self, input, partialresults):
# get a StreamReader for the encoding and feed the bytestring version
# of input to the reader byte by byte. Read every available from
# the StreamReader and check that the results equal the appropriate
# entries from partialresults.
q = Queue()
- r = codecs.getreader(encoding)(q)
+ r = codecs.getreader(self.encoding)(q)
result = u""
- for (c, partialresult) in zip(input.encode(encoding), partialresults):
+ for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
q.write(c)
result += r.read()
self.assertEqual(result, partialresult)
@@ -41,13 +41,81 @@ class PartialReadTest(unittest.TestCase):
self.assertEqual(r.bytebuffer, "")
self.assertEqual(r.charbuffer, u"")
-class UTF16Test(PartialReadTest):
+ def test_readline(self):
+ def getreader(input):
+ stream = StringIO.StringIO(input.encode(self.encoding))
+ return codecs.getreader(self.encoding)(stream)
+
+ def readalllines(input, keepends=True):
+ reader = getreader(input)
+ lines = []
+ while True:
+ line = reader.readline(keepends=keepends)
+ if not line:
+ break
+ lines.append(line)
+ return "".join(lines)
+
+ s = u"foo\nbar\r\nbaz\rspam\u2028eggs"
+ self.assertEqual(readalllines(s, True), s)
+ self.assertEqual(readalllines(s, False), u"foobarbazspameggs")
+
+ # Test long lines (multiple calls to read() in readline())
+ vw = []
+ vwo = []
+ for (i, lineend) in enumerate(u"\n \r\n \r \u2028".split()):
+ vw.append((i*200)*u"\3042" + lineend)
+ vwo.append((i*200)*u"\3042")
+ self.assertEqual(readalllines("".join(vw), True), "".join(vw))
+ self.assertEqual(readalllines("".join(vw), False),"".join(vwo))
+
+ # Test lines where the first read might end with \r, so the
+ # reader has to look ahead whether this is a lone \r or a \r\n
+ for size in xrange(80):
+ for lineend in u"\n \r\n \r \u2028".split():
+ s = size*u"a" + lineend + u"xxx\n"
+ self.assertEqual(
+ getreader(s).readline(keepends=True),
+ size*u"a" + lineend,
+ )
+ self.assertEqual(
+ getreader(s).readline(keepends=False),
+ size*u"a",
+ )
+
+ def test_readlinequeue(self):
+ q = Queue()
+ writer = codecs.getwriter(self.encoding)(q)
+ reader = codecs.getreader(self.encoding)(q)
+
+ # No lineends
+ writer.write(u"foo\r")
+ self.assertEqual(reader.readline(keepends=False), u"foo")
+ writer.write(u"\nbar\r")
+ self.assertEqual(reader.readline(keepends=False), u"bar")
+ writer.write(u"baz")
+ self.assertEqual(reader.readline(keepends=False), u"baz")
+ self.assertEqual(reader.readline(keepends=False), u"")
+
+ # Lineends
+ writer.write(u"foo\r")
+ self.assertEqual(reader.readline(keepends=True), u"foo\r")
+ writer.write(u"\nbar\r")
+ self.assertEqual(reader.readline(keepends=True), u"bar\r")
+ writer.write(u"baz")
+ self.assertEqual(reader.readline(keepends=True), u"baz")
+ self.assertEqual(reader.readline(keepends=True), u"")
+ writer.write(u"foo\r\n")
+ self.assertEqual(reader.readline(keepends=True), u"foo\r\n")
+
+class UTF16Test(ReadTest):
+ encoding = "utf-16"
spamle = '\xff\xfes\x00p\x00a\x00m\x00s\x00p\x00a\x00m\x00'
spambe = '\xfe\xff\x00s\x00p\x00a\x00m\x00s\x00p\x00a\x00m'
def test_only_one_bom(self):
- _,_,reader,writer = codecs.lookup("utf-16")
+ _,_,reader,writer = codecs.lookup(self.encoding)
# encode some stream
s = StringIO.StringIO()
f = writer(s)
@@ -63,7 +131,6 @@ class UTF16Test(PartialReadTest):
def test_partial(self):
self.check_partial(
- "utf-16",
u"\x00\xff\u0100\uffff",
[
u"", # first byte of BOM read
@@ -79,11 +146,11 @@ class UTF16Test(PartialReadTest):
]
)
-class UTF16LETest(PartialReadTest):
+class UTF16LETest(ReadTest):
+ encoding = "utf-16-le"
def test_partial(self):
self.check_partial(
- "utf-16-le",
u"\x00\xff\u0100\uffff",
[
u"",
@@ -97,11 +164,11 @@ class UTF16LETest(PartialReadTest):
]
)
-class UTF16BETest(PartialReadTest):
+class UTF16BETest(ReadTest):
+ encoding = "utf-16-be"
def test_partial(self):
self.check_partial(
- "utf-16-be",
u"\x00\xff\u0100\uffff",
[
u"",
@@ -115,11 +182,11 @@ class UTF16BETest(PartialReadTest):
]
)
-class UTF8Test(PartialReadTest):
+class UTF8Test(ReadTest):
+ encoding = "utf-8"
def test_partial(self):
self.check_partial(
- "utf-8",
u"\x00\xff\u07ff\u0800\uffff",
[
u"\x00",