summaryrefslogtreecommitdiff
path: root/Lib
diff options
context:
space:
mode:
Diffstat (limited to 'Lib')
-rw-r--r--Lib/tempfile.py114
-rw-r--r--Lib/test/test_tempfile.py104
2 files changed, 217 insertions, 1 deletions
diff --git a/Lib/tempfile.py b/Lib/tempfile.py
index ce03bb7880..bb23785726 100644
--- a/Lib/tempfile.py
+++ b/Lib/tempfile.py
@@ -19,6 +19,7 @@ This module also provides some data items to the user:
__all__ = [
"NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
+ "SpooledTemporaryFile",
"mkstemp", "mkdtemp", # low level safe interfaces
"mktemp", # deprecated unsafe interface
"TMP_MAX", "gettempprefix", # constants
@@ -37,6 +38,11 @@ if _os.name == 'mac':
import Carbon.Folders as _Folders
try:
+ from cStringIO import StringIO as _StringIO
+except:
+ from StringIO import StringIO as _StringIO
+
+try:
import fcntl as _fcntl
except ImportError:
def _set_cloexec(fd):
@@ -473,3 +479,111 @@ else:
except:
_os.close(fd)
raise
+
+class SpooledTemporaryFile:
+ """Temporary file wrapper, specialized to switch from
+ StringIO to a real file when it exceeds a certain size or
+ when a fileno is needed.
+ """
+ _rolled = False
+
+ def __init__(self, max_size=0, mode='w+b', bufsize=-1,
+ suffix="", prefix=template, dir=None):
+ self._file = _StringIO()
+ self._max_size = max_size
+ self._rolled = False
+ self._TemporaryFileArgs = (mode, bufsize, suffix, prefix, dir)
+
+ def _check(self, file):
+ if self._rolled: return
+ max_size = self._max_size
+ if max_size and file.tell() > max_size:
+ self.rollover()
+
+ def rollover(self):
+ if self._rolled: return
+ file = self._file
+ newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
+ del self._TemporaryFileArgs
+
+ newfile.write(file.getvalue())
+ newfile.seek(file.tell(), 0)
+
+ self._rolled = True
+
+ # file protocol
+ def __iter__(self):
+ return self._file.__iter__()
+
+ def close(self):
+ self._file.close()
+
+ @property
+ def closed(self):
+ return self._file.closed
+
+ @property
+ def encoding(self):
+ return self._file.encoding
+
+ def fileno(self):
+ self.rollover()
+ return self._file.fileno()
+
+ def flush(self):
+ self._file.flush()
+
+ def isatty(self):
+ return self._file.isatty()
+
+ @property
+ def mode(self):
+ return self._file.mode
+
+ @property
+ def name(self):
+ return self._file.name
+
+ @property
+ def newlines(self):
+ return self._file.newlines
+
+ def next(self):
+ return self._file.next
+
+ def read(self, *args):
+ return self._file.read(*args)
+
+ def readline(self, *args):
+ return self._file.readline(*args)
+
+ def readlines(self, *args):
+ return self._file.readlines(*args)
+
+ def seek(self, *args):
+ self._file.seek(*args)
+
+ @property
+ def softspace(self):
+ return self._file.softspace
+
+ def tell(self):
+ return self._file.tell()
+
+ def truncate(self):
+ self._file.truncate()
+
+ def write(self, s):
+ file = self._file
+ rv = file.write(s)
+ self._check(file)
+ return rv
+
+ def writelines(self, iterable):
+ file = self._file
+ rv = file.writelines(iterable)
+ self._check(file)
+ return rv
+
+ def xreadlines(self, *args):
+ return self._file.xreadlines(*args)
diff --git a/Lib/test/test_tempfile.py b/Lib/test/test_tempfile.py
index cfb24e2228..68c2728dca 100644
--- a/Lib/test/test_tempfile.py
+++ b/Lib/test/test_tempfile.py
@@ -81,7 +81,8 @@ class test_exports(TC):
"gettempprefix" : 1,
"gettempdir" : 1,
"tempdir" : 1,
- "template" : 1
+ "template" : 1,
+ "SpooledTemporaryFile" : 1
}
unexp = []
@@ -632,6 +633,107 @@ class test_NamedTemporaryFile(TC):
test_classes.append(test_NamedTemporaryFile)
+class test_SpooledTemporaryFile(TC):
+ """Test SpooledTemporaryFile()."""
+
+ def do_create(self, max_size=0, dir=None, pre="", suf=""):
+ if dir is None:
+ dir = tempfile.gettempdir()
+ try:
+ file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
+ except:
+ self.failOnException("SpooledTemporaryFile")
+
+ return file
+
+
+ def test_basic(self):
+ # SpooledTemporaryFile can create files
+ f = self.do_create()
+ self.failIf(f._rolled)
+ f = self.do_create(max_size=100, pre="a", suf=".txt")
+ self.failIf(f._rolled)
+
+ def test_del_on_close(self):
+ # A SpooledTemporaryFile is deleted when closed
+ dir = tempfile.mkdtemp()
+ try:
+ f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
+ self.failIf(f._rolled)
+ f.write('blat ' * 5)
+ self.failUnless(f._rolled)
+ filename = f.name
+ f.close()
+ self.failIf(os.path.exists(filename),
+ "SpooledTemporaryFile %s exists after close" % filename)
+ finally:
+ os.rmdir(dir)
+
+ def test_rewrite_small(self):
+ # A SpooledTemporaryFile can be written to multiple within the max_size
+ f = self.do_create(max_size=30)
+ self.failIf(f._rolled)
+ for i in range(5):
+ f.seek(0, 0)
+ f.write('x' * 20)
+ self.failIf(f._rolled)
+
+ def test_write_sequential(self):
+ # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
+ # over afterward
+ f = self.do_create(max_size=30)
+ self.failIf(f._rolled)
+ f.write('x' * 20)
+ self.failIf(f._rolled)
+ f.write('x' * 10)
+ self.failIf(f._rolled)
+ f.write('x')
+ self.failUnless(f._rolled)
+
+ def test_sparse(self):
+ # A SpooledTemporaryFile that is written late in the file will extend
+ # when that occurs
+ f = self.do_create(max_size=30)
+ self.failIf(f._rolled)
+ f.seek(100, 0)
+ self.failIf(f._rolled)
+ f.write('x')
+ self.failUnless(f._rolled)
+
+ def test_fileno(self):
+ # A SpooledTemporaryFile should roll over to a real file on fileno()
+ f = self.do_create(max_size=30)
+ self.failIf(f._rolled)
+ self.failUnless(f.fileno() > 0)
+ self.failUnless(f._rolled)
+
+ def test_multiple_close(self):
+ # A SpooledTemporaryFile can be closed many times without error
+ f = tempfile.SpooledTemporaryFile()
+ f.write('abc\n')
+ f.close()
+ try:
+ f.close()
+ f.close()
+ except:
+ self.failOnException("close")
+
+ def test_bound_methods(self):
+ # It should be OK to steal a bound method from a SpooledTemporaryFile
+ # and use it independently; when the file rolls over, those bound
+ # methods should continue to function
+ f = self.do_create(max_size=30)
+ read = f.read
+ write = f.write
+ seek = f.seek
+
+ write("a" * 35)
+ write("b" * 35)
+ seek(0, 0)
+ self.failUnless(read(70) == 'a'*35 + 'b'*35)
+
+test_classes.append(test_SpooledTemporaryFile)
+
class test_TemporaryFile(TC):
"""Test TemporaryFile()."""