diff options
Diffstat (limited to 'Lib/test/pickletester.py')
-rw-r--r-- | Lib/test/pickletester.py | 234 |
1 files changed, 209 insertions, 25 deletions
diff --git a/Lib/test/pickletester.py b/Lib/test/pickletester.py index 2454af13ed..c9e74b857f 100644 --- a/Lib/test/pickletester.py +++ b/Lib/test/pickletester.py @@ -2,11 +2,11 @@ import unittest import pickle import cPickle import StringIO +import cStringIO import pickletools import copy_reg -from test.test_support import TestFailed, have_unicode, TESTFN, \ - run_with_locale +from test.test_support import TestFailed, have_unicode, TESTFN # Tests that try a number of pickle protocols should have a # for proto in protocols: @@ -14,6 +14,42 @@ from test.test_support import TestFailed, have_unicode, TESTFN, \ assert pickle.HIGHEST_PROTOCOL == cPickle.HIGHEST_PROTOCOL == 2 protocols = range(pickle.HIGHEST_PROTOCOL + 1) +# Copy of test.test_support.run_with_locale. This is needed to support Python +# 2.4, which didn't include it. This is all to support test_xpickle, which +# bounces pickled objects through older Python versions to test backwards +# compatibility. +def run_with_locale(catstr, *locales): + def decorator(func): + def inner(*args, **kwds): + try: + import locale + category = getattr(locale, catstr) + orig_locale = locale.setlocale(category) + except AttributeError: + # if the test author gives us an invalid category string + raise + except: + # cannot retrieve original locale, so do nothing + locale = orig_locale = None + else: + for loc in locales: + try: + locale.setlocale(category, loc) + break + except: + pass + + # now run the function, resetting the locale on exceptions + try: + return func(*args, **kwds) + finally: + if locale and orig_locale: + locale.setlocale(category, orig_locale) + inner.func_name = func.func_name + inner.__doc__ = func.__doc__ + return inner + return decorator + # Return True if opcode code appears in the pickle, else False. def opcode_in_pickle(code, pickle): @@ -410,12 +446,11 @@ class AbstractPickleTests(unittest.TestCase): # is a mystery. cPickle also suppresses PUT for objects with a refcount # of 1. def dont_test_disassembly(self): - from cStringIO import StringIO from pickletools import dis for proto, expected in (0, DATA0_DIS), (1, DATA1_DIS): s = self.dumps(self._testdata, proto) - filelike = StringIO() + filelike = cStringIO.StringIO() dis(s, out=filelike) got = filelike.getvalue() self.assertEqual(expected, got) @@ -427,7 +462,7 @@ class AbstractPickleTests(unittest.TestCase): s = self.dumps(l, proto) x = self.loads(s) self.assertEqual(len(x), 1) - self.assert_(x is x[0]) + self.assertTrue(x is x[0]) def test_recursive_tuple(self): t = ([],) @@ -437,7 +472,7 @@ class AbstractPickleTests(unittest.TestCase): x = self.loads(s) self.assertEqual(len(x), 1) self.assertEqual(len(x[0]), 1) - self.assert_(x is x[0][0]) + self.assertTrue(x is x[0][0]) def test_recursive_dict(self): d = {} @@ -446,7 +481,7 @@ class AbstractPickleTests(unittest.TestCase): s = self.dumps(d, proto) x = self.loads(s) self.assertEqual(x.keys(), [1]) - self.assert_(x[1] is x) + self.assertTrue(x[1] is x) def test_recursive_inst(self): i = C() @@ -455,7 +490,7 @@ class AbstractPickleTests(unittest.TestCase): s = self.dumps(i, 2) x = self.loads(s) self.assertEqual(dir(x), dir(i)) - self.assert_(x.attr is x) + self.assertTrue(x.attr is x) def test_recursive_multi(self): l = [] @@ -469,7 +504,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(len(x), 1) self.assertEqual(dir(x[0]), dir(i)) self.assertEqual(x[0].attr.keys(), [1]) - self.assert_(x[0].attr[1] is x) + self.assertTrue(x[0].attr[1] is x) def test_garyp(self): self.assertRaises(self.error, self.loads, 'garyp') @@ -610,7 +645,7 @@ class AbstractPickleTests(unittest.TestCase): try: self.loads(badpickle) except ValueError, detail: - self.failUnless(str(detail).startswith( + self.assertTrue(str(detail).startswith( "unsupported pickle protocol")) else: self.fail("expected bad protocol number to raise ValueError") @@ -682,7 +717,7 @@ class AbstractPickleTests(unittest.TestCase): for x in None, False, True: s = self.dumps(x, proto) y = self.loads(s) - self.assert_(x is y, (proto, x, s, y)) + self.assertTrue(x is y, (proto, x, s, y)) expected = expected_opcode[proto, x] self.assertEqual(opcode_in_pickle(expected, s), True) @@ -732,8 +767,8 @@ class AbstractPickleTests(unittest.TestCase): # Dump using protocol 1 for comparison. s1 = self.dumps(x, 1) - self.assert_(__name__ in s1) - self.assert_("MyList" in s1) + self.assertIn(__name__, s1) + self.assertIn("MyList", s1) self.assertEqual(opcode_in_pickle(opcode, s1), False) y = self.loads(s1) @@ -742,8 +777,8 @@ class AbstractPickleTests(unittest.TestCase): # Dump using protocol 2 for test. s2 = self.dumps(x, 2) - self.assert_(__name__ not in s2) - self.assert_("MyList" not in s2) + self.assertNotIn(__name__, s2) + self.assertNotIn("MyList", s2) self.assertEqual(opcode_in_pickle(opcode, s2), True) y = self.loads(s2) @@ -787,7 +822,7 @@ class AbstractPickleTests(unittest.TestCase): if proto == 0: self.assertEqual(num_appends, 0) else: - self.failUnless(num_appends >= 2) + self.assertTrue(num_appends >= 2) def test_dict_chunking(self): n = 10 # too small to chunk @@ -809,7 +844,7 @@ class AbstractPickleTests(unittest.TestCase): if proto == 0: self.assertEqual(num_setitems, 0) else: - self.failUnless(num_setitems >= 2) + self.assertTrue(num_setitems >= 2) def test_simple_newobj(self): x = object.__new__(SimpleNewObj) # avoid __init__ @@ -833,7 +868,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(x.bar, y.bar) def test_reduce_overrides_default_reduce_ex(self): - for proto in 0, 1, 2: + for proto in protocols: x = REX_one() self.assertEqual(x._reduce_called, 0) s = self.dumps(x, proto) @@ -842,7 +877,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(y._reduce_called, 0) def test_reduce_ex_called(self): - for proto in 0, 1, 2: + for proto in protocols: x = REX_two() self.assertEqual(x._proto, None) s = self.dumps(x, proto) @@ -851,7 +886,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(y._proto, None) def test_reduce_ex_overrides_reduce(self): - for proto in 0, 1, 2: + for proto in protocols: x = REX_three() self.assertEqual(x._proto, None) s = self.dumps(x, proto) @@ -860,7 +895,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(y._proto, None) def test_reduce_ex_calls_base(self): - for proto in 0, 1, 2: + for proto in protocols: x = REX_four() self.assertEqual(x._proto, None) s = self.dumps(x, proto) @@ -869,7 +904,7 @@ class AbstractPickleTests(unittest.TestCase): self.assertEqual(y._proto, proto) def test_reduce_calls_base(self): - for proto in 0, 1, 2: + for proto in protocols: x = REX_five() self.assertEqual(x._reduce_called, 0) s = self.dumps(x, proto) @@ -890,7 +925,7 @@ class AbstractPickleTests(unittest.TestCase): return dict, (), None, None, [] # Protocol 0 is less strict and also accept iterables. - for proto in 0, 1, 2: + for proto in protocols: try: self.dumps(C(), proto) except (AttributeError, pickle.PickleError, cPickle.PickleError): @@ -900,6 +935,35 @@ class AbstractPickleTests(unittest.TestCase): except (AttributeError, pickle.PickleError, cPickle.PickleError): pass + def test_many_puts_and_gets(self): + # Test that internal data structures correctly deal with lots of + # puts/gets. + keys = ("aaa" + str(i) for i in xrange(100)) + large_dict = dict((k, [4, 5, 6]) for k in keys) + obj = [dict(large_dict), dict(large_dict), dict(large_dict)] + + for proto in protocols: + dumped = self.dumps(obj, proto) + loaded = self.loads(dumped) + self.assertEqual(loaded, obj, + "Failed protocol %d: %r != %r" + % (proto, obj, loaded)) + + def test_attribute_name_interning(self): + # Test that attribute names of pickled objects are interned when + # unpickling. + for proto in protocols: + x = C() + x.foo = 42 + x.bar = "hello" + s = self.dumps(x, proto) + y = self.loads(s) + x_keys = sorted(x.__dict__) + y_keys = sorted(y.__dict__) + for x_key, y_key in zip(x_keys, y_keys): + self.assertIs(x_key, y_key) + + # Test classes for reduce_ex class REX_one(object): @@ -1001,13 +1065,20 @@ class AbstractPickleModuleTests(unittest.TestCase): finally: os.remove(TESTFN) + def test_load_from_and_dump_to_file(self): + stream = cStringIO.StringIO() + data = [123, {}, 124] + self.module.dump(data, stream) + stream.seek(0) + unpickled = self.module.load(stream) + self.assertEqual(unpickled, data) + def test_highest_protocol(self): # Of course this needs to be changed when HIGHEST_PROTOCOL changes. self.assertEqual(self.module.HIGHEST_PROTOCOL, 2) def test_callapi(self): - from cStringIO import StringIO - f = StringIO() + f = cStringIO.StringIO() # With and without keyword arguments self.module.dump(123, f, -1) self.module.dump(123, file=f, protocol=-1) @@ -1073,3 +1144,116 @@ class AbstractPersistentPicklerTests(unittest.TestCase): self.assertEqual(self.loads(self.dumps(L, 1)), L) self.assertEqual(self.id_count, 5) self.assertEqual(self.load_count, 5) + +class AbstractPicklerUnpicklerObjectTests(unittest.TestCase): + + pickler_class = None + unpickler_class = None + + def setUp(self): + assert self.pickler_class + assert self.unpickler_class + + def test_clear_pickler_memo(self): + # To test whether clear_memo() has any effect, we pickle an object, + # then pickle it again without clearing the memo; the two serialized + # forms should be different. If we clear_memo() and then pickle the + # object again, the third serialized form should be identical to the + # first one we obtained. + data = ["abcdefg", "abcdefg", 44] + f = cStringIO.StringIO() + pickler = self.pickler_class(f) + + pickler.dump(data) + first_pickled = f.getvalue() + + # Reset StringIO object. + f.seek(0) + f.truncate() + + pickler.dump(data) + second_pickled = f.getvalue() + + # Reset the Pickler and StringIO objects. + pickler.clear_memo() + f.seek(0) + f.truncate() + + pickler.dump(data) + third_pickled = f.getvalue() + + self.assertNotEqual(first_pickled, second_pickled) + self.assertEqual(first_pickled, third_pickled) + + def test_priming_pickler_memo(self): + # Verify that we can set the Pickler's memo attribute. + data = ["abcdefg", "abcdefg", 44] + f = cStringIO.StringIO() + pickler = self.pickler_class(f) + + pickler.dump(data) + first_pickled = f.getvalue() + + f = cStringIO.StringIO() + primed = self.pickler_class(f) + primed.memo = pickler.memo + + primed.dump(data) + primed_pickled = f.getvalue() + + self.assertNotEqual(first_pickled, primed_pickled) + + def test_priming_unpickler_memo(self): + # Verify that we can set the Unpickler's memo attribute. + data = ["abcdefg", "abcdefg", 44] + f = cStringIO.StringIO() + pickler = self.pickler_class(f) + + pickler.dump(data) + first_pickled = f.getvalue() + + f = cStringIO.StringIO() + primed = self.pickler_class(f) + primed.memo = pickler.memo + + primed.dump(data) + primed_pickled = f.getvalue() + + unpickler = self.unpickler_class(cStringIO.StringIO(first_pickled)) + unpickled_data1 = unpickler.load() + + self.assertEqual(unpickled_data1, data) + + primed = self.unpickler_class(cStringIO.StringIO(primed_pickled)) + primed.memo = unpickler.memo + unpickled_data2 = primed.load() + + primed.memo.clear() + + self.assertEqual(unpickled_data2, data) + self.assertTrue(unpickled_data2 is unpickled_data1) + + def test_reusing_unpickler_objects(self): + data1 = ["abcdefg", "abcdefg", 44] + f = cStringIO.StringIO() + pickler = self.pickler_class(f) + pickler.dump(data1) + pickled1 = f.getvalue() + + data2 = ["abcdefg", 44, 44] + f = cStringIO.StringIO() + pickler = self.pickler_class(f) + pickler.dump(data2) + pickled2 = f.getvalue() + + f = cStringIO.StringIO() + f.write(pickled1) + f.seek(0) + unpickler = self.unpickler_class(f) + self.assertEqual(unpickler.load(), data1) + + f.seek(0) + f.truncate() + f.write(pickled2) + f.seek(0) + self.assertEqual(unpickler.load(), data2) |