summaryrefslogtreecommitdiff
path: root/Lib/test/test_gzip.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_gzip.py')
-rw-r--r--Lib/test/test_gzip.py31
1 files changed, 27 insertions, 4 deletions
diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py
index b417044bcb..d8408e15cd 100644
--- a/Lib/test/test_gzip.py
+++ b/Lib/test/test_gzip.py
@@ -6,6 +6,7 @@ from test import support
import os
import io
import struct
+import array
gzip = support.import_module('gzip')
data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
@@ -77,15 +78,18 @@ class TestGzip(BaseTest):
def test_write_bytearray(self):
self.write_and_read_back(bytearray(data1 * 50))
+ def test_write_array(self):
+ self.write_and_read_back(array.array('I', data1 * 40))
+
def test_write_incompatible_type(self):
# Test that non-bytes-like types raise TypeError.
# Issue #21560: attempts to write incompatible types
# should not affect the state of the fileobject
with gzip.GzipFile(self.filename, 'wb') as f:
with self.assertRaises(TypeError):
- f.write('a')
+ f.write('')
with self.assertRaises(TypeError):
- f.write([1])
+ f.write([])
f.write(data1)
with gzip.GzipFile(self.filename, 'rb') as f:
self.assertEqual(f.read(), data1)
@@ -119,7 +123,10 @@ class TestGzip(BaseTest):
# Write to a file, open it for reading, then close it.
self.test_write()
f = gzip.GzipFile(self.filename, 'r')
+ fileobj = f.fileobj
+ self.assertFalse(fileobj.closed)
f.close()
+ self.assertTrue(fileobj.closed)
with self.assertRaises(ValueError):
f.read(1)
with self.assertRaises(ValueError):
@@ -128,7 +135,10 @@ class TestGzip(BaseTest):
f.tell()
# Open the file for writing, then close it.
f = gzip.GzipFile(self.filename, 'w')
+ fileobj = f.fileobj
+ self.assertFalse(fileobj.closed)
f.close()
+ self.assertTrue(fileobj.closed)
with self.assertRaises(ValueError):
f.write(b'')
with self.assertRaises(ValueError):
@@ -267,9 +277,10 @@ class TestGzip(BaseTest):
with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
fWrite.write(data1)
with gzip.GzipFile(self.filename) as fRead:
+ self.assertTrue(hasattr(fRead, 'mtime'))
+ self.assertIsNone(fRead.mtime)
dataRead = fRead.read()
self.assertEqual(dataRead, data1)
- self.assertTrue(hasattr(fRead, 'mtime'))
self.assertEqual(fRead.mtime, mtime)
def test_metadata(self):
@@ -412,6 +423,18 @@ class TestGzip(BaseTest):
with gzip.GzipFile(str_filename, "rb") as f:
self.assertEqual(f.read(), data1 * 50)
+ def test_decompress_limited(self):
+ """Decompressed data buffering should be limited"""
+ bomb = gzip.compress(bytes(int(2e6)), compresslevel=9)
+ self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
+
+ bomb = io.BytesIO(bomb)
+ decomp = gzip.GzipFile(fileobj=bomb)
+ self.assertEqual(bytes(1), decomp.read(1))
+ max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
+ self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
+ "Excessive amount of data was decompressed")
+
# Testing compress/decompress shortcut functions
def test_compress(self):
@@ -459,7 +482,7 @@ class TestGzip(BaseTest):
with gzip.open(self.filename, "wb") as f:
f.write(data1)
with gzip.open(self.filename, "rb") as f:
- f.fileobj.prepend()
+ f._buffer.raw._fp.prepend()
class TestOpen(BaseTest):
def test_binary_modes(self):