summaryrefslogtreecommitdiff
path: root/Lib/test/test_hashlib.py
diff options
context:
space:
mode:
Diffstat (limited to 'Lib/test/test_hashlib.py')
-rw-r--r--Lib/test/test_hashlib.py182
1 files changed, 169 insertions, 13 deletions
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index 52c50f534b..855ecbfb4e 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -6,11 +6,23 @@
# Licensed to PSF under a Contributor Agreement.
#
+import array
import hashlib
+import itertools
+import sys
+try:
+ import threading
+except ImportError:
+ threading = None
import unittest
+import warnings
from test import test_support
from test.test_support import _4G, precisionbigmemtest
+# Were we compiled --with-pydebug or with #define Py_DEBUG?
+COMPILED_WITH_PYDEBUG = hasattr(sys, 'gettotalrefcount')
+
+
def hexstr(s):
import string
h = string.hexdigits
@@ -26,24 +38,111 @@ class HashLibTestCase(unittest.TestCase):
'sha224', 'SHA224', 'sha256', 'SHA256',
'sha384', 'SHA384', 'sha512', 'SHA512' )
+ _warn_on_extension_import = COMPILED_WITH_PYDEBUG
+
+ def _conditional_import_module(self, module_name):
+ """Import a module and return a reference to it or None on failure."""
+ try:
+ exec('import '+module_name)
+ except ImportError, error:
+ if self._warn_on_extension_import:
+ warnings.warn('Did a C extension fail to compile? %s' % error)
+ return locals().get(module_name)
+
+ def __init__(self, *args, **kwargs):
+ algorithms = set()
+ for algorithm in self.supported_hash_names:
+ algorithms.add(algorithm.lower())
+ self.constructors_to_test = {}
+ for algorithm in algorithms:
+ self.constructors_to_test[algorithm] = set()
+
+ # For each algorithm, test the direct constructor and the use
+ # of hashlib.new given the algorithm name.
+ for algorithm, constructors in self.constructors_to_test.items():
+ constructors.add(getattr(hashlib, algorithm))
+ def _test_algorithm_via_hashlib_new(data=None, _alg=algorithm):
+ if data is None:
+ return hashlib.new(_alg)
+ return hashlib.new(_alg, data)
+ constructors.add(_test_algorithm_via_hashlib_new)
+
+ _hashlib = self._conditional_import_module('_hashlib')
+ if _hashlib:
+ # These two algorithms should always be present when this module
+ # is compiled. If not, something was compiled wrong.
+ assert hasattr(_hashlib, 'openssl_md5')
+ assert hasattr(_hashlib, 'openssl_sha1')
+ for algorithm, constructors in self.constructors_to_test.items():
+ constructor = getattr(_hashlib, 'openssl_'+algorithm, None)
+ if constructor:
+ constructors.add(constructor)
+
+ _md5 = self._conditional_import_module('_md5')
+ if _md5:
+ self.constructors_to_test['md5'].add(_md5.new)
+ _sha = self._conditional_import_module('_sha')
+ if _sha:
+ self.constructors_to_test['sha1'].add(_sha.new)
+ _sha256 = self._conditional_import_module('_sha256')
+ if _sha256:
+ self.constructors_to_test['sha224'].add(_sha256.sha224)
+ self.constructors_to_test['sha256'].add(_sha256.sha256)
+ _sha512 = self._conditional_import_module('_sha512')
+ if _sha512:
+ self.constructors_to_test['sha384'].add(_sha512.sha384)
+ self.constructors_to_test['sha512'].add(_sha512.sha512)
+
+ super(HashLibTestCase, self).__init__(*args, **kwargs)
+
+ def test_hash_array(self):
+ a = array.array("b", range(10))
+ constructors = self.constructors_to_test.itervalues()
+ for cons in itertools.chain.from_iterable(constructors):
+ c = cons(a)
+ c.hexdigest()
+
+ def test_algorithms_attribute(self):
+ self.assertEqual(hashlib.algorithms,
+ tuple([_algo for _algo in self.supported_hash_names if
+ _algo.islower()]))
+
def test_unknown_hash(self):
try:
hashlib.new('spam spam spam spam spam')
except ValueError:
pass
else:
- self.assert_(0 == "hashlib didn't reject bogus hash name")
+ self.assertTrue(0 == "hashlib didn't reject bogus hash name")
+
+ def test_get_builtin_constructor(self):
+ get_builtin_constructor = hashlib.__dict__[
+ '__get_builtin_constructor']
+ self.assertRaises(ValueError, get_builtin_constructor, 'test')
+ try:
+ import _md5
+ except ImportError:
+ pass
+ # This forces an ImportError for "import _md5" statements
+ sys.modules['_md5'] = None
+ try:
+ self.assertRaises(ValueError, get_builtin_constructor, 'md5')
+ finally:
+ if '_md5' in locals():
+ sys.modules['_md5'] = _md5
+ else:
+ del sys.modules['_md5']
def test_hexdigest(self):
for name in self.supported_hash_names:
h = hashlib.new(name)
- self.assert_(hexstr(h.digest()) == h.hexdigest())
-
+ self.assertTrue(hexstr(h.digest()) == h.hexdigest())
def test_large_update(self):
aas = 'a' * 128
bees = 'b' * 127
cees = 'c' * 126
+ abcs = aas + bees + cees
for name in self.supported_hash_names:
m1 = hashlib.new(name)
@@ -52,17 +151,39 @@ class HashLibTestCase(unittest.TestCase):
m1.update(cees)
m2 = hashlib.new(name)
- m2.update(aas + bees + cees)
- self.assertEqual(m1.digest(), m2.digest())
+ m2.update(abcs)
+ self.assertEqual(m1.digest(), m2.digest(), name+' update problem.')
- def check(self, name, data, digest):
- # test the direct constructors
- computed = getattr(hashlib, name)(data).hexdigest()
- self.assert_(computed == digest)
- # test the general new() interface
- computed = hashlib.new(name, data).hexdigest()
- self.assert_(computed == digest)
+ m3 = hashlib.new(name, abcs)
+ self.assertEqual(m1.digest(), m3.digest(), name+' new problem.')
+ def check(self, name, data, digest):
+ constructors = self.constructors_to_test[name]
+ # 2 is for hashlib.name(...) and hashlib.new(name, ...)
+ self.assertGreaterEqual(len(constructors), 2)
+ for hash_object_constructor in constructors:
+ computed = hash_object_constructor(data).hexdigest()
+ self.assertEqual(
+ computed, digest,
+ "Hash algorithm %s constructed using %s returned hexdigest"
+ " %r for %d byte input data that should have hashed to %r."
+ % (name, hash_object_constructor,
+ computed, len(data), digest))
+
+ def check_unicode(self, algorithm_name):
+ # Unicode objects are not allowed as input.
+ expected = hashlib.new(algorithm_name, str(u'spam')).hexdigest()
+ self.check(algorithm_name, u'spam', expected)
+
+ def test_unicode(self):
+ # In python 2.x unicode is auto-encoded to the system default encoding
+ # when passed to hashlib functions.
+ self.check_unicode('md5')
+ self.check_unicode('sha1')
+ self.check_unicode('sha224')
+ self.check_unicode('sha256')
+ self.check_unicode('sha384')
+ self.check_unicode('sha512')
def test_case_md5_0(self):
self.check('md5', '', 'd41d8cd98f00b204e9800998ecf8427e')
@@ -196,10 +317,45 @@ class HashLibTestCase(unittest.TestCase):
"e718483d0ce769644e2e42c7bc15b4638e1f98b13b2044285632a803afa973eb"+
"de0ff244877ea60a4cb0432ce577c31beb009c5c2c49aa2e4eadb217ad8cc09b")
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ @test_support.reap_threads
+ def test_threaded_hashing(self):
+ # Updating the same hash object from several threads at once
+ # using data chunk sizes containing the same byte sequences.
+ #
+ # If the internal locks are working to prevent multiple
+ # updates on the same object from running at once, the resulting
+ # hash will be the same as doing it single threaded upfront.
+ hasher = hashlib.sha1()
+ num_threads = 5
+ smallest_data = 'swineflu'
+ data = smallest_data*200000
+ expected_hash = hashlib.sha1(data*num_threads).hexdigest()
+
+ def hash_in_chunks(chunk_size, event):
+ index = 0
+ while index < len(data):
+ hasher.update(data[index:index+chunk_size])
+ index += chunk_size
+ event.set()
+
+ events = []
+ for threadnum in xrange(num_threads):
+ chunk_size = len(data) // (10**threadnum)
+ assert chunk_size > 0
+ assert chunk_size % len(smallest_data) == 0
+ event = threading.Event()
+ events.append(event)
+ threading.Thread(target=hash_in_chunks,
+ args=(chunk_size, event)).start()
+
+ for event in events:
+ event.wait()
+
+ self.assertEqual(expected_hash, hasher.hexdigest())
def test_main():
test_support.run_unittest(HashLibTestCase)
-
if __name__ == "__main__":
test_main()