diff options
Diffstat (limited to 'passlib/tests')
-rw-r--r-- | passlib/tests/sample1.cfg | 9 | ||||
-rw-r--r-- | passlib/tests/sample1b.cfg | 9 | ||||
-rw-r--r-- | passlib/tests/sample1c.cfg | bin | 0 -> 490 bytes | |||
-rw-r--r-- | passlib/tests/test_apache.py | 347 | ||||
-rw-r--r-- | passlib/tests/test_apps.py | 8 | ||||
-rw-r--r-- | passlib/tests/test_context.py | 1737 | ||||
-rw-r--r-- | passlib/tests/test_context_deprecated.py | 1165 | ||||
-rw-r--r-- | passlib/tests/test_ext_django.py | 144 | ||||
-rw-r--r-- | passlib/tests/test_handlers.py | 272 | ||||
-rw-r--r-- | passlib/tests/test_hosts.py | 2 | ||||
-rw-r--r-- | passlib/tests/test_registry.py | 2 | ||||
-rw-r--r-- | passlib/tests/test_utils.py | 514 | ||||
-rw-r--r-- | passlib/tests/test_utils_crypto.py | 550 | ||||
-rw-r--r-- | passlib/tests/tox_support.py | 37 | ||||
-rw-r--r-- | passlib/tests/utils.py | 197 |
15 files changed, 3505 insertions, 1488 deletions
diff --git a/passlib/tests/sample1.cfg b/passlib/tests/sample1.cfg new file mode 100644 index 0000000..c90ba83 --- /dev/null +++ b/passlib/tests/sample1.cfg @@ -0,0 +1,9 @@ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +all__vary_rounds = 0.1 +bsdi_crypt__default_rounds = 25000 +bsdi_crypt__max_rounds = 30000 +sha512_crypt__max_rounds = 50000 +sha512_crypt__min_rounds = 40000 + diff --git a/passlib/tests/sample1b.cfg b/passlib/tests/sample1b.cfg new file mode 100644 index 0000000..1cb4fd1 --- /dev/null +++ b/passlib/tests/sample1b.cfg @@ -0,0 +1,9 @@ +[passlib]
+schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt
+default = md5_crypt
+all__vary_rounds = 0.1
+bsdi_crypt__default_rounds = 25000
+bsdi_crypt__max_rounds = 30000
+sha512_crypt__max_rounds = 50000
+sha512_crypt__min_rounds = 40000
+
diff --git a/passlib/tests/sample1c.cfg b/passlib/tests/sample1c.cfg Binary files differnew file mode 100644 index 0000000..c58ce0e --- /dev/null +++ b/passlib/tests/sample1c.cfg diff --git a/passlib/tests/test_apache.py b/passlib/tests/test_apache.py index d3b4ab8..f05c05b 100644 --- a/passlib/tests/test_apache.py +++ b/passlib/tests/test_apache.py @@ -32,10 +32,23 @@ class HtpasswdFileTest(TestCase): "test HtpasswdFile class" descriptionPrefix = "HtpasswdFile" - sample_01 = b('user2:2CHkkwa2AtqGs\nuser3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\nuser1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\n') + # sample with 4 users + sample_01 = b('user2:2CHkkwa2AtqGs\n' + 'user3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\n' + 'user4:pass4\n' + 'user1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\n') + + # sample 1 with user 1, 2 deleted; 4 changed sample_02 = b('user3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\n') - sample_03 = b('user2:pass2x\nuser3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\nuser4:pass4\nuser1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\nuser5:pass5\n') + # sample 1 with user2 updated, user 1 first entry removed, and user 5 added + sample_03 = b('user2:pass2x\n' + 'user3:{SHA}3ipNV1GrBtxPmHFC21fCbVCSXIo=\n' + 'user4:pass4\n' + 'user1:$apr1$t4tc7jTh$GPIWVUo8sQKJlUdV8V5vu0\n' + 'user5:pass5\n') + + # standalone sample with 8-bit username sample_04_utf8 = b('user\xc3\xa6:2CHkkwa2AtqGs\n') sample_04_latin1 = b('user\xe6:2CHkkwa2AtqGs\n') @@ -46,60 +59,93 @@ class HtpasswdFileTest(TestCase): if gae_env: return self.skipTest("GAE doesn't offer read/write filesystem access") - #check with existing file + # check with existing file path = mktemp() set_file(path, self.sample_01) ht = apache.HtpasswdFile(path) self.assertEqual(ht.to_string(), self.sample_01) - #check autoload=False - ht = apache.HtpasswdFile(path, autoload=False) + # check without autoload + ht = apache.HtpasswdFile(path, new=True) self.assertEqual(ht.to_string(), b("")) - #check missing file + # check missing file os.remove(path) self.assertRaises(IOError, apache.HtpasswdFile, path) - #NOTE: "default" option checked via update() test, among others + #NOTE: "default_scheme" option checked via set_password() test, among others def test_01_delete(self): "test delete()" - ht = apache.HtpasswdFile._from_string(self.sample_01) - self.assertTrue(ht.delete("user1")) + ht = apache.HtpasswdFile.from_string(self.sample_01) + self.assertTrue(ht.delete("user1")) # should delete both entries self.assertTrue(ht.delete("user2")) - self.assertTrue(not ht.delete("user5")) + self.assertFalse(ht.delete("user5")) # user not present self.assertEqual(ht.to_string(), self.sample_02) + # invalid user self.assertRaises(ValueError, ht.delete, "user:") - def test_02_update(self): - "test update()" - ht = apache.HtpasswdFile._from_string( - self.sample_01, default="plaintext") - self.assertTrue(ht.update("user2", "pass2x")) - self.assertTrue(not ht.update("user5", "pass5")) + def test_01_delete_autosave(self): + if gae_env: + return self.skipTest("GAE doesn't offer read/write filesystem access") + path = mktemp() + sample = b('user1:pass1\nuser2:pass2\n') + set_file(path, sample) + + ht = apache.HtpasswdFile(path) + ht.delete("user1") + self.assertEqual(get_file(path), sample) + + ht = apache.HtpasswdFile(path, autosave=True) + ht.delete("user1") + self.assertEqual(get_file(path), b("user2:pass2\n")) + + def test_02_set_password(self): + "test set_password()" + ht = apache.HtpasswdFile.from_string( + self.sample_01, default_scheme="plaintext") + self.assertTrue(ht.set_password("user2", "pass2x")) + self.assertFalse(ht.set_password("user5", "pass5")) self.assertEqual(ht.to_string(), self.sample_03) - self.assertRaises(ValueError, ht.update, "user:", "pass") + # invalid user + self.assertRaises(ValueError, ht.set_password, "user:", "pass") + + def test_02_set_password_autosave(self): + if gae_env: + return self.skipTest("GAE doesn't offer read/write filesystem access") + path = mktemp() + sample = b('user1:pass1\n') + set_file(path, sample) + + ht = apache.HtpasswdFile(path) + ht.set_password("user1", "pass2") + self.assertEqual(get_file(path), sample) + + ht = apache.HtpasswdFile(path, default_scheme="plaintext", autosave=True) + ht.set_password("user1", "pass2") + self.assertEqual(get_file(path), b("user1:pass2\n")) def test_03_users(self): "test users()" - ht = apache.HtpasswdFile._from_string(self.sample_01) - ht.update("user5", "pass5") + ht = apache.HtpasswdFile.from_string(self.sample_01) + ht.set_password("user5", "pass5") ht.delete("user3") - ht.update("user3", "pass3") - self.assertEqual(ht.users(), ["user2", "user4", "user1", "user5", "user3"]) - - def test_04_verify(self): - "test verify()" - ht = apache.HtpasswdFile._from_string(self.sample_01) - self.assertTrue(ht.verify("user5","pass5") is None) + ht.set_password("user3", "pass3") + self.assertEqual(ht.users(), ["user2", "user4", "user1", "user5", + "user3"]) + + def test_04_check_password(self): + "test check_password()" + ht = apache.HtpasswdFile.from_string(self.sample_01) + self.assertTrue(ht.check_password("user5","pass5") is None) for i in irange(1,5): i = str(i) - self.assertTrue(ht.verify("user"+i, "pass"+i)) - self.assertTrue(ht.verify("user"+i, "pass5") is False) + self.assertTrue(ht.check_password("user"+i, "pass"+i)) + self.assertTrue(ht.check_password("user"+i, "pass5") is False) - self.assertRaises(ValueError, ht.verify, "user:", "pass") + self.assertRaises(ValueError, ht.check_password, "user:", "pass") def test_05_load(self): "test load()" @@ -110,33 +156,36 @@ class HtpasswdFileTest(TestCase): path = mktemp() set_file(path, "") backdate_file_mtime(path, 5) - ha = apache.HtpasswdFile(path, default="plaintext") + ha = apache.HtpasswdFile(path, default_scheme="plaintext") self.assertEqual(ha.to_string(), b("")) - #make changes, check force=False does nothing - ha.update("user1", "pass1") - ha.load(force=False) + #make changes, check load_if_changed() does nothing + ha.set_password("user1", "pass1") + ha.load_if_changed() self.assertEqual(ha.to_string(), b("user1:pass1\n")) #change file set_file(path, self.sample_01) - ha.load(force=False) + ha.load_if_changed() self.assertEqual(ha.to_string(), self.sample_01) - #make changes, check force=True overwrites them - ha.update("user5", "pass5") + #make changes, check load() overwrites them + ha.set_password("user5", "pass5") ha.load() self.assertEqual(ha.to_string(), self.sample_01) #test load w/ no path hb = apache.HtpasswdFile() self.assertRaises(RuntimeError, hb.load) - self.assertRaises(RuntimeError, hb.load, force=False) + self.assertRaises(RuntimeError, hb.load_if_changed) - #test load w/ dups + #test load w/ dups and explicit path set_file(path, self.sample_dup) - hc = apache.HtpasswdFile(path) - self.assertTrue(hc.verify('user1','pass1')) + hc = apache.HtpasswdFile() + hc.load(path) + self.assertTrue(hc.check_password('user1','pass1')) + + # NOTE: load_string() tested via from_string(), which is used all over this file def test_06_save(self): "test save()" @@ -155,44 +204,37 @@ class HtpasswdFileTest(TestCase): self.assertEqual(get_file(path), self.sample_02) #test save w/ no path - hb = apache.HtpasswdFile() - hb.update("user1", "pass1") + hb = apache.HtpasswdFile(default_scheme="plaintext") + hb.set_password("user1", "pass1") self.assertRaises(RuntimeError, hb.save) + # test save w/ explicit path + hb.save(path) + self.assertEqual(get_file(path), b("user1:pass1\n")) + def test_07_encodings(self): - "test encoding parameter behavior" - #test bad encodings cause failure in constructor + "test 'encoding' kwd" + # test bad encodings cause failure in constructor self.assertRaises(ValueError, apache.HtpasswdFile, encoding="utf-16") - #check users() returns native string by default - ht = apache.HtpasswdFile._from_string(self.sample_01) - self.assertIsInstance(ht.users()[0], str) - - #check returns unicode if encoding explicitly set - ht = apache.HtpasswdFile._from_string(self.sample_01, encoding="utf-8") - self.assertIsInstance(ht.users()[0], unicode) - - #check returns bytes if encoding explicitly disabled - ht = apache.HtpasswdFile._from_string(self.sample_01, encoding=None) - self.assertIsInstance(ht.users()[0], bytes) - - #check sample utf-8 - ht = apache.HtpasswdFile._from_string(self.sample_04_utf8, encoding="utf-8") + # check sample utf-8 + ht = apache.HtpasswdFile.from_string(self.sample_04_utf8, encoding="utf-8", + return_unicode=True) self.assertEqual(ht.users(), [ u("user\u00e6") ]) - #check sample latin-1 - ht = apache.HtpasswdFile._from_string(self.sample_04_latin1, - encoding="latin-1") + # check sample latin-1 + ht = apache.HtpasswdFile.from_string(self.sample_04_latin1, + encoding="latin-1", return_unicode=True) self.assertEqual(ht.users(), [ u("user\u00e6") ]) def test_08_to_string(self): "test to_string" - #check with known sample - ht = apache.HtpasswdFile._from_string(self.sample_01) + # check with known sample + ht = apache.HtpasswdFile.from_string(self.sample_01) self.assertEqual(ht.to_string(), self.sample_01) - #test blank + # test blank ht = apache.HtpasswdFile() self.assertEqual(ht.to_string(), b("")) @@ -207,10 +249,24 @@ class HtdigestFileTest(TestCase): "test HtdigestFile class" descriptionPrefix = "HtdigestFile" - sample_01 = b('user2:realm:549d2a5f4659ab39a80dac99e159ab19\nuser3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\nuser1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n') - sample_02 = b('user3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\n') - sample_03 = b('user2:realm:5ba6d8328943c23c64b50f8b29566059\nuser3:realm:a500bb8c02f6a9170ae46af10c898744\nuser4:realm:ab7b5d5f28ccc7666315f508c7358519\nuser1:realm:2a6cf53e7d8f8cf39d946dc880b14128\nuser5:realm:03c55fdc6bf71552356ad401bdb9af19\n') + # sample with 4 users + sample_01 = b('user2:realm:549d2a5f4659ab39a80dac99e159ab19\n' + 'user3:realm:a500bb8c02f6a9170ae46af10c898744\n' + 'user4:realm:ab7b5d5f28ccc7666315f508c7358519\n' + 'user1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n') + + # sample 1 with user 1, 2 deleted; 4 changed + sample_02 = b('user3:realm:a500bb8c02f6a9170ae46af10c898744\n' + 'user4:realm:ab7b5d5f28ccc7666315f508c7358519\n') + # sample 1 with user2 updated, user 1 first entry removed, and user 5 added + sample_03 = b('user2:realm:5ba6d8328943c23c64b50f8b29566059\n' + 'user3:realm:a500bb8c02f6a9170ae46af10c898744\n' + 'user4:realm:ab7b5d5f28ccc7666315f508c7358519\n' + 'user1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n' + 'user5:realm:03c55fdc6bf71552356ad401bdb9af19\n') + + # standalone sample with 8-bit username & realm sample_04_utf8 = b('user\xc3\xa6:realm\xc3\xa6:549d2a5f4659ab39a80dac99e159ab19\n') sample_04_latin1 = b('user\xe6:realm\xe6:549d2a5f4659ab39a80dac99e159ab19\n') @@ -219,61 +275,101 @@ class HtdigestFileTest(TestCase): if gae_env: return self.skipTest("GAE doesn't offer read/write filesystem access") - #check with existing file + # check with existing file path = mktemp() set_file(path, self.sample_01) ht = apache.HtdigestFile(path) self.assertEqual(ht.to_string(), self.sample_01) - #check autoload=False - ht = apache.HtdigestFile(path, autoload=False) + # check without autoload + ht = apache.HtdigestFile(path, new=True) self.assertEqual(ht.to_string(), b("")) - #check missing file + # check missing file os.remove(path) self.assertRaises(IOError, apache.HtdigestFile, path) + # NOTE: default_realm option checked via other tests. + def test_01_delete(self): "test delete()" - ht = apache.HtdigestFile._from_string(self.sample_01) + ht = apache.HtdigestFile.from_string(self.sample_01) self.assertTrue(ht.delete("user1", "realm")) self.assertTrue(ht.delete("user2", "realm")) - self.assertTrue(not ht.delete("user5", "realm")) + self.assertFalse(ht.delete("user5", "realm")) + self.assertFalse(ht.delete("user3", "realm5")) self.assertEqual(ht.to_string(), self.sample_02) + # invalid user self.assertRaises(ValueError, ht.delete, "user:", "realm") - def test_02_update(self): + # invalid realm + self.assertRaises(ValueError, ht.delete, "user", "realm:") + + def test_01_delete_autosave(self): + if gae_env: + return self.skipTest("GAE doesn't offer read/write filesystem access") + path = mktemp() + set_file(path, self.sample_01) + + ht = apache.HtdigestFile(path) + self.assertTrue(ht.delete("user1", "realm")) + self.assertFalse(ht.delete("user3", "realm5")) + self.assertFalse(ht.delete("user5", "realm")) + self.assertEqual(get_file(path), self.sample_01) + + ht.autosave = True + self.assertTrue(ht.delete("user2", "realm")) + self.assertEqual(get_file(path), self.sample_02) + + def test_02_set_password(self): "test update()" - ht = apache.HtdigestFile._from_string(self.sample_01) - self.assertTrue(ht.update("user2", "realm", "pass2x")) - self.assertTrue(not ht.update("user5", "realm", "pass5")) + ht = apache.HtdigestFile.from_string(self.sample_01) + self.assertTrue(ht.set_password("user2", "realm", "pass2x")) + self.assertFalse(ht.set_password("user5", "realm", "pass5")) self.assertEqual(ht.to_string(), self.sample_03) - self.assertRaises(ValueError, ht.update, "user:", "realm", "pass") - self.assertRaises(ValueError, ht.update, "u"*256, "realm", "pass") + # default realm + self.assertRaises(TypeError, ht.set_password, "user2", "pass3") + ht.default_realm = "realm2" + ht.set_password("user2", "pass3") + ht.check_password("user2", "realm2", "pass3") - self.assertRaises(ValueError, ht.update, "user", "realm:", "pass") - self.assertRaises(ValueError, ht.update, "user", "r"*256, "pass") + # invalid user + self.assertRaises(ValueError, ht.set_password, "user:", "realm", "pass") + self.assertRaises(ValueError, ht.set_password, "u"*256, "realm", "pass") + + # invalid realm + self.assertRaises(ValueError, ht.set_password, "user", "realm:", "pass") + self.assertRaises(ValueError, ht.set_password, "user", "r"*256, "pass") + + # TODO: test set_password autosave def test_03_users(self): "test users()" - ht = apache.HtdigestFile._from_string(self.sample_01) - ht.update("user5", "realm", "pass5") + ht = apache.HtdigestFile.from_string(self.sample_01) + ht.set_password("user5", "realm", "pass5") ht.delete("user3", "realm") - ht.update("user3", "realm", "pass3") + ht.set_password("user3", "realm", "pass3") self.assertEqual(ht.users("realm"), ["user2", "user4", "user1", "user5", "user3"]) - def test_04_verify(self): - "test verify()" - ht = apache.HtdigestFile._from_string(self.sample_01) - self.assertTrue(ht.verify("user5", "realm","pass5") is None) + def test_04_check_password(self): + "test check_password()" + ht = apache.HtdigestFile.from_string(self.sample_01) + self.assertIs(ht.check_password("user5", "realm","pass5"), None) for i in irange(1,5): i = str(i) - self.assertTrue(ht.verify("user"+i, "realm", "pass"+i)) - self.assertTrue(ht.verify("user"+i, "realm", "pass5") is False) + self.assertTrue(ht.check_password("user"+i, "realm", "pass"+i)) + self.assertIs(ht.check_password("user"+i, "realm", "pass5"), False) + + # default realm + self.assertRaises(TypeError, ht.check_password, "user5", "pass5") + ht.default_realm = "realm" + self.assertTrue(ht.check_password("user1", "pass1")) + self.assertIs(ht.check_password("user5", "pass5"), None) - self.assertRaises(ValueError, ht.verify, "user:", "realm", "pass") + # invalid user + self.assertRaises(ValueError, ht.check_password, "user:", "realm", "pass") def test_05_load(self): "test load()" @@ -287,25 +383,30 @@ class HtdigestFileTest(TestCase): ha = apache.HtdigestFile(path) self.assertEqual(ha.to_string(), b("")) - #make changes, check force=False does nothing - ha.update("user1", "realm", "pass1") - ha.load(force=False) + #make changes, check load_if_changed() does nothing + ha.set_password("user1", "realm", "pass1") + ha.load_if_changed() self.assertEqual(ha.to_string(), b('user1:realm:2a6cf53e7d8f8cf39d946dc880b14128\n')) #change file set_file(path, self.sample_01) - ha.load(force=False) + ha.load_if_changed() self.assertEqual(ha.to_string(), self.sample_01) #make changes, check force=True overwrites them - ha.update("user5", "realm", "pass5") + ha.set_password("user5", "realm", "pass5") ha.load() self.assertEqual(ha.to_string(), self.sample_01) #test load w/ no path hb = apache.HtdigestFile() self.assertRaises(RuntimeError, hb.load) - self.assertRaises(RuntimeError, hb.load, force=False) + self.assertRaises(RuntimeError, hb.load_if_changed) + + # test load w/ explicit path + hc = apache.HtdigestFile() + hc.load(path) + self.assertEqual(hc.to_string(), self.sample_01) def test_06_save(self): "test save()" @@ -325,12 +426,16 @@ class HtdigestFileTest(TestCase): #test save w/ no path hb = apache.HtdigestFile() - hb.update("user1", "realm", "pass1") + hb.set_password("user1", "realm", "pass1") self.assertRaises(RuntimeError, hb.save) + # test save w/ explicit path + hb.save(path) + self.assertEqual(get_file(path), hb.to_string()) + def test_07_realms(self): "test realms() & delete_realm()" - ht = apache.HtdigestFile._from_string(self.sample_01) + ht = apache.HtdigestFile.from_string(self.sample_01) self.assertEqual(ht.delete_realm("x"), 0) self.assertEqual(ht.realms(), ['realm']) @@ -339,52 +444,36 @@ class HtdigestFileTest(TestCase): self.assertEqual(ht.realms(), []) self.assertEqual(ht.to_string(), b("")) - def test_08_find(self): - "test find()" - ht = apache.HtdigestFile._from_string(self.sample_01) - self.assertEqual(ht.find("user3", "realm"), "a500bb8c02f6a9170ae46af10c898744") - self.assertEqual(ht.find("user4", "realm"), "ab7b5d5f28ccc7666315f508c7358519") - self.assertEqual(ht.find("user5", "realm"), None) + def test_08_get_hash(self): + "test get_hash()" + ht = apache.HtdigestFile.from_string(self.sample_01) + self.assertEqual(ht.get_hash("user3", "realm"), "a500bb8c02f6a9170ae46af10c898744") + self.assertEqual(ht.get_hash("user4", "realm"), "ab7b5d5f28ccc7666315f508c7358519") + self.assertEqual(ht.get_hash("user5", "realm"), None) def test_09_encodings(self): "test encoding parameter" - #test bad encodings cause failure in constructor + # test bad encodings cause failure in constructor self.assertRaises(ValueError, apache.HtdigestFile, encoding="utf-16") - #check users() returns native string by default - ht = apache.HtdigestFile._from_string(self.sample_01) - self.assertIsInstance(ht.realms()[0], str) - self.assertIsInstance(ht.users("realm")[0], str) - - #check returns unicode if encoding explicitly set - ht = apache.HtdigestFile._from_string(self.sample_01, encoding="utf-8") - self.assertIsInstance(ht.realms()[0], unicode) - self.assertIsInstance(ht.users(u("realm"))[0], unicode) - - #check returns bytes if encoding explicitly disabled - ht = apache.HtdigestFile._from_string(self.sample_01, encoding=None) - self.assertIsInstance(ht.realms()[0], bytes) - self.assertIsInstance(ht.users(b("realm"))[0], bytes) - - #check sample utf-8 - ht = apache.HtdigestFile._from_string(self.sample_04_utf8, encoding="utf-8") + # check sample utf-8 + ht = apache.HtdigestFile.from_string(self.sample_04_utf8, encoding="utf-8", return_unicode=True) self.assertEqual(ht.realms(), [ u("realm\u00e6") ]) self.assertEqual(ht.users(u("realm\u00e6")), [ u("user\u00e6") ]) - #check sample latin-1 - ht = apache.HtdigestFile._from_string(self.sample_04_latin1, encoding="latin-1") + # check sample latin-1 + ht = apache.HtdigestFile.from_string(self.sample_04_latin1, encoding="latin-1", return_unicode=True) self.assertEqual(ht.realms(), [ u("realm\u00e6") ]) self.assertEqual(ht.users(u("realm\u00e6")), [ u("user\u00e6") ]) - def test_10_to_string(self): "test to_string()" - #check sample - ht = apache.HtdigestFile._from_string(self.sample_01) + # check sample + ht = apache.HtdigestFile.from_string(self.sample_01) self.assertEqual(ht.to_string(), self.sample_01) - #check blank + # check blank ht = apache.HtdigestFile() self.assertEqual(ht.to_string(), b("")) diff --git a/passlib/tests/test_apps.py b/passlib/tests/test_apps.py index d48654d..1758c38 100644 --- a/passlib/tests/test_apps.py +++ b/passlib/tests/test_apps.py @@ -25,7 +25,7 @@ class AppsTest(TestCase): def test_custom_app_context(self): ctx = apps.custom_app_context - self.assertEqual(ctx.policy.schemes(), ["sha512_crypt", "sha256_crypt"]) + self.assertEqual(ctx.schemes(), ("sha512_crypt", "sha256_crypt")) for hash in [ ('$6$rounds=41128$VoQLvDjkaZ6L6BIE$4pt.1Ll1XdDYduEwEYPCMOBiR6W6' 'znsyUEoNlcVXpv2gKKIbQolgmTGe6uEEVJ7azUxuc8Tf7zV9SD2z7Ij751'), @@ -93,10 +93,12 @@ class AppsTest(TestCase): h1 = '$2a$10$Ljj0Kgu7Ddob9xWoqzn0ae.uNfxPRofowWdksk.6jCUHKTGYLD.QG' if hashmod.bcrypt.has_backend(): self.assertTrue(ctx.verify("test", h1)) - self.assertEqual(ctx.policy.get_handler().name, "bcrypt") + self.assertEqual(ctx.default_scheme(), "bcrypt") + self.assertEqual(ctx.handler().name, "bcrypt") else: self.assertEqual(ctx.identify(h1), "bcrypt") - self.assertEqual(ctx.policy.get_handler().name, "phpass") + self.assertEqual(ctx.default_scheme(), "phpass") + self.assertEqual(ctx.handler().name, "phpass") def test_phpbb3_context(self): ctx = apps.phpbb3_context diff --git a/passlib/tests/test_context.py b/passlib/tests/test_context.py index d1e4511..d039ac6 100644 --- a/passlib/tests/test_context.py +++ b/passlib/tests/test_context.py @@ -1,9 +1,14 @@ -"""tests for passlib.pwhash -- (c) Assurance Technologies 2003-2009""" +"""tests for passlib.context""" #========================================================= #imports #========================================================= from __future__ import with_statement +from passlib.utils.compat import PY3 #core +if PY3: + from configparser import NoSectionError +else: + from ConfigParser import NoSectionError import hashlib from logging import getLogger import os @@ -17,7 +22,7 @@ except ImportError: resource_filename = None #pkg from passlib import hash -from passlib.context import CryptContext, CryptPolicy, LazyCryptContext +from passlib.context import CryptContext, LazyCryptContext from passlib.exc import PasslibConfigWarning from passlib.utils import tick, to_bytes, to_unicode from passlib.utils.compat import irange, u @@ -25,90 +30,105 @@ import passlib.utils.handlers as uh from passlib.tests.utils import TestCase, mktemp, catch_warnings, \ gae_env, set_file from passlib.registry import register_crypt_handler_path, has_crypt_handler, \ - _unload_handler_name as unload_handler_name + _unload_handler_name as unload_handler_name, get_crypt_handler #module log = getLogger(__name__) +#========================================================= +# support +#========================================================= +here = os.path.abspath(os.path.dirname(__file__)) + +def merge_dicts(first, *args, **kwds): + target = first.copy() + for arg in args: + target.update(arg) + if kwds: + target.update(kwds) + return target #========================================================= # #========================================================= -class CryptPolicyTest(TestCase): - "test CryptPolicy object" - - #TODO: need to test user categories w/in all this +class CryptContextTest(TestCase): + descriptionPrefix = "CryptContext" - descriptionPrefix = "CryptPolicy" + # TODO: these unittests could really use a good cleanup + # and reorganizing, to ensure they're getting everything. #========================================================= - #sample crypt policies used for testing + # sample configurations used in tests #========================================================= #----------------------------------------------------- - #sample 1 - average config file + # sample 1 - typical configuration #----------------------------------------------------- - #NOTE: copy of this is stored in file passlib/tests/sample_config_1s.cfg - sample_config_1s = """\ -[passlib] -schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt -default = md5_crypt -all.vary_rounds = 10%% -bsdi_crypt.max_rounds = 30000 -bsdi_crypt.default_rounds = 25000 -sha512_crypt.max_rounds = 50000 -sha512_crypt.min_rounds = 40000 -""" - sample_config_1s_path = os.path.abspath(os.path.join( - os.path.dirname(__file__), "sample_config_1s.cfg")) - if not os.path.exists(sample_config_1s_path) and resource_filename: - #in case we're zipped up in an egg. - sample_config_1s_path = resource_filename("passlib.tests", - "sample_config_1s.cfg") - - #make sure sample_config_1s uses \n linesep - tests rely on this - assert sample_config_1s.startswith("[passlib]\nschemes") - - sample_config_1pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + sample_1_schemes = ["des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"] + sample_1_handlers = [get_crypt_handler(name) for name in sample_1_schemes] + + sample_1_dict = dict( + schemes = sample_1_schemes, default = "md5_crypt", - all__vary_rounds = "10%", + all__vary_rounds = 0.1, bsdi_crypt__max_rounds = 30000, bsdi_crypt__default_rounds = 25000, sha512_crypt__max_rounds = 50000, sha512_crypt__min_rounds = 40000, ) - sample_config_1pid = { - "schemes": "des_crypt, md5_crypt, bsdi_crypt, sha512_crypt", - "default": "md5_crypt", - "all.vary_rounds": "10%", - "bsdi_crypt.max_rounds": 30000, - "bsdi_crypt.default_rounds": 25000, - "sha512_crypt.max_rounds": 50000, - "sha512_crypt.min_rounds": 40000, - } - - sample_config_1prd = dict( - schemes = [ hash.des_crypt, hash.md5_crypt, hash.bsdi_crypt, hash.sha512_crypt], - default = "md5_crypt", # NOTE: passlib <= 1.5 was handler obj. - all__vary_rounds = "10%", - bsdi_crypt__max_rounds = 30000, - bsdi_crypt__default_rounds = 25000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds = 40000, - ) + sample_1_resolved_dict = merge_dicts(sample_1_dict, + schemes = sample_1_handlers) + + sample_1_unnormalized = u("""\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +; this is using %... +all__vary_rounds = 10%% +; this is using 'rounds' instead of 'default_rounds' +bsdi_crypt__rounds = 25000 +bsdi_crypt__max_rounds = 30000 +sha512_crypt__max_rounds = 50000 +sha512_crypt__min_rounds = 40000 +""") + + sample_1_unicode = u("""\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +all__vary_rounds = 0.1 +bsdi_crypt__default_rounds = 25000 +bsdi_crypt__max_rounds = 30000 +sha512_crypt__max_rounds = 50000 +sha512_crypt__min_rounds = 40000 + +""") #----------------------------------------------------- - #sample 2 - partial policy & result of overlay on sample 1 + # sample 1 external files #----------------------------------------------------- - sample_config_2s = """\ -[passlib] -bsdi_crypt.min_rounds = 29000 -bsdi_crypt.max_rounds = 35000 -bsdi_crypt.default_rounds = 31000 -sha512_crypt.min_rounds = 45000 -""" - sample_config_2pd = dict( + # sample 1 string with '\n' linesep + sample_1_path = os.path.join(here, "sample1.cfg") + + # sample 1 with '\r\n' linesep + sample_1b_unicode = sample_1_unicode.replace(u("\n"), u("\r\n")) + sample_1b_path = os.path.join(here, "sample1b.cfg") + + # sample 1 using UTF-16 and alt section + sample_1c_bytes = sample_1_unicode.replace(u("[passlib]"), + u("[mypolicy]")).encode("utf-16") + sample_1c_path = os.path.join(here, "sample1c.cfg") + + # enable to regenerate sample files + if False: + set_file(sample_1_path, sample_1_unicode) + set_file(sample_1b_path, sample_1b_unicode) + set_file(sample_1c_path, sample_1c_bytes) + + #----------------------------------------------------- + # sample 2 & 12 - options patch + #----------------------------------------------------- + sample_2_dict = dict( #using this to test full replacement of existing options bsdi_crypt__min_rounds = 29000, bsdi_crypt__max_rounds = 35000, @@ -117,511 +137,891 @@ sha512_crypt.min_rounds = 45000 sha512_crypt__min_rounds=45000, ) - sample_config_12pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - default = "md5_crypt", - all__vary_rounds = "10%", - bsdi_crypt__min_rounds = 29000, - bsdi_crypt__max_rounds = 35000, - bsdi_crypt__default_rounds = 31000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds=45000, - ) + sample_2_unicode = """\ +[passlib] +bsdi_crypt__min_rounds = 29000 +bsdi_crypt__max_rounds = 35000 +bsdi_crypt__default_rounds = 31000 +sha512_crypt__min_rounds = 45000 +""" + + # sample 2 overlayed on top of sample 1 + sample_12_dict = merge_dicts(sample_1_dict, sample_2_dict) #----------------------------------------------------- - #sample 3 - just changing default + # sample 3 & 123 - just changing default from sample 1 #----------------------------------------------------- - sample_config_3pd = dict( + sample_3_dict = dict( default="sha512_crypt", ) - sample_config_123pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - default = "sha512_crypt", - all__vary_rounds = "10%", - bsdi_crypt__min_rounds = 29000, - bsdi_crypt__max_rounds = 35000, - bsdi_crypt__default_rounds = 31000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds=45000, - ) + # sample 3 overlayed on 2 overlayed on 1 + sample_123_dict = merge_dicts(sample_12_dict, sample_3_dict) #----------------------------------------------------- - #sample 4 - category specific + # sample 4 - used by api tests #----------------------------------------------------- - sample_config_4s = """ -[passlib] -schemes = sha512_crypt -all.vary_rounds = 10%% -default.sha512_crypt.max_rounds = 20000 -admin.all.vary_rounds = 5%% -admin.sha512_crypt.max_rounds = 40000 -""" + sample_4_dict = dict( + schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", + "sha256_crypt"], + deprecated = [ "des_crypt", ], + default = "sha256_crypt", + bsdi_crypt__max_rounds = 30, + bsdi_crypt__default_rounds = 25, + bsdi_crypt__vary_rounds = 0, + sha256_crypt__max_rounds = 3000, + sha256_crypt__min_rounds = 2000, + sha256_crypt__default_rounds = 3000, + phpass__ident = "H", + phpass__default_rounds = 7, + ) - sample_config_4pd = dict( - schemes = [ "sha512_crypt" ], - all__vary_rounds = "10%", - sha512_crypt__max_rounds = 20000, - admin__all__vary_rounds = "5%", - admin__sha512_crypt__max_rounds = 40000, - ) + #========================================================= + # constructors + #========================================================= + def test_01_constructor(self): + "test class constructor" - #----------------------------------------------------- - #sample 5 - to_string & deprecation testing - #----------------------------------------------------- - sample_config_5s = sample_config_1s + """\ -deprecated = des_crypt -admin__context__deprecated = des_crypt, bsdi_crypt -""" + # test blank constructor works correctly + ctx = CryptContext() + self.assertEqual(ctx.to_dict(), {}) - sample_config_5pd = sample_config_1pd.copy() - sample_config_5pd.update( - deprecated = [ "des_crypt" ], - admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], - ) + # test sample 1 with scheme=names + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 with scheme=handlers + ctx = CryptContext(**self.sample_1_resolved_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 2: options w/o schemes + ctx = CryptContext(**self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_2_dict) - sample_config_5pid = sample_config_1pid.copy() - sample_config_5pid.update({ - "deprecated": "des_crypt", - "admin.context.deprecated": "des_crypt, bsdi_crypt", - }) + # test sample 3: default only + ctx = CryptContext(**self.sample_3_dict) + self.assertEqual(ctx.to_dict(), self.sample_3_dict) - sample_config_5prd = sample_config_1prd.copy() - sample_config_5prd.update({ - # XXX: should deprecated return the actual handlers in this case? - # would have to modify how policy stores info, for one. - "deprecated": ["des_crypt"], - "admin__context__deprecated": ["des_crypt", "bsdi_crypt"], - }) + def test_02_from_string(self): + "test from_string() constructor" + # test sample 1 unicode + ctx = CryptContext.from_string(self.sample_1_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 with unnormalized inputs + ctx = CryptContext.from_string(self.sample_1_unnormalized) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 utf-8 + ctx = CryptContext.from_string(self.sample_1_unicode.encode("utf-8")) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 w/ '\r\n' linesep + ctx = CryptContext.from_string(self.sample_1b_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 using UTF-16 and alt section + ctx = CryptContext.from_string(self.sample_1c_bytes, section="mypolicy", + encoding="utf-16") + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test wrong type + self.assertRaises(TypeError, CryptContext.from_string, None) + + # test missing section + self.assertRaises(NoSectionError, CryptContext.from_string, + self.sample_1_unicode, section="fakesection") + + def test_03_from_path(self): + "test from_path() constructor" + # make sure sample files exist + if not os.path.exists(self.sample_1_path): + raise RuntimeError("can't find data file: %r" % self.sample_1_path) + + # test sample 1 + ctx = CryptContext.from_path(self.sample_1_path) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 w/ '\r\n' linesep + ctx = CryptContext.from_path(self.sample_1b_path) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 encoding using UTF-16 and alt section + ctx = CryptContext.from_path(self.sample_1c_path, section="mypolicy", + encoding="utf-16") + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test missing file + self.assertRaises(EnvironmentError, CryptContext.from_path, + os.path.join(here, "sample1xxx.cfg")) + + # test missing section + self.assertRaises(NoSectionError, CryptContext.from_path, + self.sample_1_path, section="fakesection") + + def test_04_copy(self): + "test copy() method" + cc1 = CryptContext(**self.sample_1_dict) + + # overlay sample 2 onto copy + cc2 = cc1.copy(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc2.to_dict(), self.sample_12_dict) + + # check that repeating overlay makes no change + cc2b = cc2.copy(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc2b.to_dict(), self.sample_12_dict) + + # overlay sample 3 on copy + cc3 = cc2.copy(**self.sample_3_dict) + self.assertEqual(cc3.to_dict(), self.sample_123_dict) + + # test empty copy creates separate copy + cc4 = cc1.copy() + self.assertIsNot(cc4, cc1) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc4.to_dict(), self.sample_1_dict) + + # ... and that modifying copy doesn't affect original + cc4.update(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc4.to_dict(), self.sample_12_dict) #========================================================= - #constructors + # modifiers #========================================================= - def test_00_constructor(self): - "test CryptPolicy() constructor" - policy = CryptPolicy(**self.sample_config_1pd) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #check key with too many separators is rejected - self.assertRaises(TypeError, CryptPolicy, - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - bad__key__bsdi_crypt__max_rounds = 30000, + def test_10_load(self): + "test load() / load_path() method" + # NOTE: load() is the workhorse that handles all policy parsing, + # compilation, and validation. most of it's features are tested + # elsewhere, since all the constructors and modifiers are just + # wrappers for it. + + # source_type 'auto' + ctx = CryptContext() + + # detect dict + ctx.load(self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # detect unicode string + ctx.load(self.sample_1_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # detect bytes string + ctx.load(self.sample_1_unicode.encode("utf-8")) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # anything else - TypeError + self.assertRaises(TypeError, ctx.load, None) + + # NOTE: load_path() tested by from_path() + # NOTE: additional string tests done by from_string() + + # update flag - tested by update() method tests + # encoding keyword - tested by from_string() & from_path() + # section keyword - tested by from_string() & from_path() + + # multiple loads should clear the state + ctx = CryptContext() + ctx.load(self.sample_1_dict) + ctx.load(self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_2_dict) + + def test_11_load_rollback(self): + "test load() errors restore old state" + # create initial context + cc = CryptContext(["des_crypt", "sha256_crypt"], + sha256_crypt__default_rounds=5000, + all__vary_rounds=0.1, ) + result = cc.to_string() - #check nameless handler rejected - class nameless(uh.StaticHandler): - name = None - self.assertRaises(ValueError, CryptPolicy, schemes=[nameless]) + # do an update operation that should fail during parsing + # XXX: not sure what the right error type is here. + self.assertRaises(TypeError, cc.update, too__many__key__parts=True) + self.assertEqual(cc.to_string(), result) - # check scheme must be name or crypt handler - self.assertRaises(TypeError, CryptPolicy, schemes=[uh.StaticHandler]) + # do an update operation that should fail during extraction + # FIXME: this isn't failing even in broken case, need to figure out + # way to ensure some keys come after this one. + self.assertRaises(KeyError, cc.update, fake_context_option=True) + self.assertEqual(cc.to_string(), result) - #check name conflicts are rejected - class dummy_1(uh.StaticHandler): - name = 'dummy_1' - self.assertRaises(KeyError, CryptPolicy, schemes=[dummy_1, dummy_1]) + # do an update operation that should fail during compilation + self.assertRaises(ValueError, cc.update, sha256_crypt__min_rounds=10000) + self.assertEqual(cc.to_string(), result) - #with unknown deprecated value - self.assertRaises(KeyError, CryptPolicy, - schemes=['des_crypt'], - deprecated=['md5_crypt']) + def test_12_update(self): + "test update() method" - #with unknown default value - self.assertRaises(KeyError, CryptPolicy, - schemes=['des_crypt'], - default='md5_crypt') + # empty overlay + ctx = CryptContext(**self.sample_1_dict) + ctx.update() + self.assertEqual(ctx.to_dict(), self.sample_1_dict) - def test_01_from_path_simple(self): - "test CryptPolicy.from_path() constructor" - #NOTE: this is separate so it can also run under GAE + # test basic overlay + ctx = CryptContext(**self.sample_1_dict) + ctx.update(**self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - #test preset stored in existing file - path = self.sample_config_1s_path - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # ... and again + ctx.update(**self.sample_3_dict) + self.assertEqual(ctx.to_dict(), self.sample_123_dict) - #test if path missing - self.assertRaises(EnvironmentError, CryptPolicy.from_path, path + 'xxx') + # overlay w/ dict arg + ctx = CryptContext(**self.sample_1_dict) + ctx.update(self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - def test_01_from_path(self): - "test CryptPolicy.from_path() constructor with encodings" - if gae_env: - return self.skipTest("GAE doesn't offer read/write filesystem access") + # overlay w/ string + ctx = CryptContext(**self.sample_1_dict) + ctx.update(self.sample_2_unicode) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - path = mktemp() + # too many args + self.assertRaises(TypeError, ctx.update, {}, {}) + self.assertRaises(TypeError, ctx.update, {}, schemes=['des_crypt']) - #test "\n" linesep - set_file(path, self.sample_config_1s) - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # wrong arg type + self.assertRaises(TypeError, ctx.update, None) - #test "\r\n" linesep - set_file(path, self.sample_config_1s.replace("\n","\r\n")) - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + #========================================================= + # option parsing + #========================================================= + def test_20_options(self): + "test basic option parsing" + def parse(**kwds): + return CryptContext(**kwds).to_dict() - #test with custom encoding - uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") - set_file(path, uc2) - policy = CryptPolicy.from_path(path, encoding="utf-16") - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # + # common option parsing tests + # - def test_02_from_string(self): - "test CryptPolicy.from_string() constructor" - #test "\n" linesep - policy = CryptPolicy.from_string(self.sample_config_1s) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test "\r\n" linesep - policy = CryptPolicy.from_string( - self.sample_config_1s.replace("\n","\r\n")) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test with unicode - data = to_unicode(self.sample_config_1s) - policy = CryptPolicy.from_string(data) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test with non-ascii-compatible encoding - uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") - policy = CryptPolicy.from_string(uc2, encoding="utf-16") - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test category specific options - policy = CryptPolicy.from_string(self.sample_config_4s) - self.assertEqual(policy.to_dict(), self.sample_config_4pd) - - def test_03_from_source(self): - "test CryptPolicy.from_source() constructor" - #pass it a path - policy = CryptPolicy.from_source(self.sample_config_1s_path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it a string - policy = CryptPolicy.from_source(self.sample_config_1s) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it a dict (NOTE: make a copy to detect in-place modifications) - policy = CryptPolicy.from_source(self.sample_config_1pd.copy()) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it existing policy - p2 = CryptPolicy.from_source(policy) - self.assertIs(policy, p2) - - #pass it something wrong - self.assertRaises(TypeError, CryptPolicy.from_source, 1) - self.assertRaises(TypeError, CryptPolicy.from_source, []) - - def test_04_from_sources(self): - "test CryptPolicy.from_sources() constructor" - - #pass it empty list - self.assertRaises(ValueError, CryptPolicy.from_sources, []) - - #pass it one-element list - policy = CryptPolicy.from_sources([self.sample_config_1s]) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass multiple sources - policy = CryptPolicy.from_sources( - [ - self.sample_config_1s_path, - self.sample_config_2s, - self.sample_config_3pd, - ]) - self.assertEqual(policy.to_dict(), self.sample_config_123pd) - - def test_05_replace(self): - "test CryptPolicy.replace() constructor" - - p1 = CryptPolicy(**self.sample_config_1pd) - - #check overlaying sample 2 - p2 = p1.replace(**self.sample_config_2pd) - self.assertEqual(p2.to_dict(), self.sample_config_12pd) - - #check repeating overlay makes no change - p2b = p2.replace(**self.sample_config_2pd) - self.assertEqual(p2b.to_dict(), self.sample_config_12pd) - - #check overlaying sample 3 - p3 = p2.replace(self.sample_config_3pd) - self.assertEqual(p3.to_dict(), self.sample_config_123pd) - - def test_06_forbidden(self): - "test CryptPolicy() forbidden kwds" - - #salt not allowed to be set - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - des_crypt__salt="xx", - ) - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - all__salt="xx", - ) + # test key with blank separators is rejected + self.assertRaises(TypeError, CryptContext, __=0.1) + self.assertRaises(TypeError, CryptContext, __default='x') + self.assertRaises(TypeError, CryptContext, default____default='x') + self.assertRaises(TypeError, CryptContext, __default____default='x') - #schemes not allowed for category - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - user__context__schemes=["md5_crypt"], - ) + # test key with too many separators is rejected + self.assertRaises(TypeError, CryptContext, + category__scheme__option__invalid = 30000) - #========================================================= - #reading - #========================================================= - def test_10_has_schemes(self): - "test has_schemes() method" + # + # context option -specific tests + # - p1 = CryptPolicy(**self.sample_config_1pd) - self.assertTrue(p1.has_schemes()) + # test context option key parsing + result = dict(default="md5_crypt") + self.assertEqual(parse(default="md5_crypt"), result) + self.assertEqual(parse(context__default="md5_crypt"), result) + self.assertEqual(parse(default__context__default="md5_crypt"), result) + self.assertEqual(parse(**{"context.default":"md5_crypt"}), result) + self.assertEqual(parse(**{"default.context.default":"md5_crypt"}), result) - p3 = CryptPolicy(**self.sample_config_3pd) - self.assertTrue(not p3.has_schemes()) + # test context option key parsing w/ category + result = dict(admin__context__default="md5_crypt") + self.assertEqual(parse(admin__context__default="md5_crypt"), result) + self.assertEqual(parse(**{"admin.context.default":"md5_crypt"}), result) - def test_11_iter_handlers(self): - "test iter_handlers() method" + # + # hash option -specific tests + # - p1 = CryptPolicy(**self.sample_config_1pd) - s = self.sample_config_1prd['schemes'] - self.assertEqual(list(p1.iter_handlers()), s) + # test hash option key parsing + result = dict(all__vary_rounds=0.1) + self.assertEqual(parse(all__vary_rounds=0.1), result) + self.assertEqual(parse(default__all__vary_rounds=0.1), result) + self.assertEqual(parse(**{"all.vary_rounds":0.1}), result) + self.assertEqual(parse(**{"default.all.vary_rounds":0.1}), result) - p3 = CryptPolicy(**self.sample_config_3pd) - self.assertEqual(list(p3.iter_handlers()), []) + # test hash option key parsing w/ category + result = dict(admin__all__vary_rounds=0.1) + self.assertEqual(parse(admin__all__vary_rounds=0.1), result) + self.assertEqual(parse(**{"admin.all.vary_rounds":0.1}), result) - def test_12_get_handler(self): - "test get_handler() method" + # settings not allowed if not in hash.settings_kwds + ctx = CryptContext(["phpass", "md5_crypt"], phpass__ident="P") + self.assertRaises(KeyError, ctx.copy, md5_crypt__ident="P") - p1 = CryptPolicy(**self.sample_config_1pd) + # hash options 'salt' and 'rounds' not allowed + self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"], + des_crypt__salt="xx") + self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"], + all__salt="xx") - #check by name - self.assertIs(p1.get_handler("bsdi_crypt"), hash.bsdi_crypt) + def test_21_schemes(self): + "test 'schemes' context option parsing" - #check by missing name - self.assertIs(p1.get_handler("sha256_crypt"), None) - self.assertRaises(KeyError, p1.get_handler, "sha256_crypt", required=True) + # schemes can be empty + cc = CryptContext(schemes=None) + self.assertEqual(cc.schemes(), ()) - #check default - self.assertIs(p1.get_handler(), hash.md5_crypt) + # schemes can be list of names + cc = CryptContext(schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - def test_13_get_options(self): - "test get_options() method" + # schemes can be comma-sep string + cc = CryptContext(schemes=" des_crypt, md5_crypt, ") + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - p12 = CryptPolicy(**self.sample_config_12pd) + # schemes can be list of handlers + cc = CryptContext(schemes=[hash.des_crypt, hash.md5_crypt]) + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - self.assertEqual(p12.get_options("bsdi_crypt"),dict( - vary_rounds = "10%", - min_rounds = 29000, - max_rounds = 35000, - default_rounds = 31000, - )) + # scheme must be name or handler + self.assertRaises(TypeError, CryptContext, schemes=[uh.StaticHandler]) - self.assertEqual(p12.get_options("sha512_crypt"),dict( - vary_rounds = "10%", - min_rounds = 45000, - max_rounds = 50000, - )) + # handlers must have a name + class nameless(uh.StaticHandler): + name = None + self.assertRaises(ValueError, CryptContext, schemes=[nameless]) + + # names must be unique + class dummy_1(uh.StaticHandler): + name = 'dummy_1' + self.assertRaises(KeyError, CryptContext, schemes=[dummy_1, dummy_1]) + + # schemes not allowed per-category + self.assertRaises(KeyError, CryptContext, + admin__context__schemes=["md5_crypt"]) + + def test_22_deprecated(self): + "test 'deprecated' context option parsing" + def getdep(ctx, category=None): + return [name for name in ctx.schemes() + if ctx._is_deprecated_scheme(name, category)] + + # no schemes - all deprecated values allowed + cc = CryptContext(deprecated=["md5_crypt"]) + cc.update(schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc),["md5_crypt"]) + + # deprecated values allowed if subset of schemes + cc = CryptContext(deprecated=["md5_crypt"], schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc), ["md5_crypt"]) + + # can be handler + # XXX: allow handlers in deprecated list? not for now. + self.assertRaises(TypeError, CryptContext, deprecated=[hash.md5_crypt], + schemes=["md5_crypt", "des_crypt"]) +## cc = CryptContext(deprecated=[hash.md5_crypt], schemes=["md5_crypt", "des_crypt"]) +## self.assertEqual(getdep(cc), ["md5_crypt"]) + + # comma sep list + cc = CryptContext(deprecated="md5_crypt,des_crypt", schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc), ["md5_crypt", "des_crypt"]) + + # values outside of schemes not allowed + self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'], + deprecated=['md5_crypt']) + + # wrong type + self.assertRaises(TypeError, CryptContext, deprecated=123) + + # deprecated per-category + cc = CryptContext(deprecated=["md5_crypt"], + schemes=["md5_crypt", "des_crypt"], + admin__context__deprecated=["des_crypt"], + ) + self.assertEqual(getdep(cc), ["md5_crypt"]) + self.assertEqual(getdep(cc, "user"), ["md5_crypt"]) + self.assertEqual(getdep(cc, "admin"), ["des_crypt"]) + + def test_23_default(self): + "test 'default' context option parsing" + + # anything allowed if no schemes + self.assertEqual(CryptContext(default="md5_crypt").to_dict(), + dict(default="md5_crypt")) + + # default allowed if in scheme list + ctx = CryptContext(default="md5_crypt", schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(ctx.default_scheme(), "md5_crypt") - p4 = CryptPolicy.from_string(self.sample_config_4s) - self.assertEqual(p4.get_options("sha512_crypt"), dict( - vary_rounds="10%", + # default can be handler + # XXX: sure we want to allow this ? maybe deprecate in future. + ctx = CryptContext(default=hash.md5_crypt, schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(ctx.default_scheme(), "md5_crypt") + + # error if not in scheme list + self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'], + default='md5_crypt') + + # wrong type + self.assertRaises(TypeError, CryptContext, default=1) + + # per-category + ctx = CryptContext(default="des_crypt", + schemes=["des_crypt", "md5_crypt"], + admin__context__default="md5_crypt") + self.assertEqual(ctx.default_scheme(), "des_crypt") + self.assertEqual(ctx.default_scheme("user"), "des_crypt") + self.assertEqual(ctx.default_scheme("admin"), "md5_crypt") + + def test_24_vary_rounds(self): + "test 'vary_rounds' hash option parsing" + def parse(v): + return CryptContext(all__vary_rounds=v).to_dict()['all__vary_rounds'] + + # floats should be preserved + self.assertEqual(parse(0.1), 0.1) + self.assertEqual(parse('0.1'), 0.1) + + # 'xx%' should be converted to float + self.assertEqual(parse('10%'), 0.1) + + # ints should be preserved + self.assertEqual(parse(1000), 1000) + self.assertEqual(parse('1000'), 1000) + + #========================================================= + # inspection & serialization + #========================================================= + def test_30_schemes(self): + "test schemes() method" + # NOTE: also checked under test_21 + + # test empty + ctx = CryptContext() + self.assertEqual(ctx.schemes(), ()) + self.assertEqual(ctx.schemes(resolve=True), ()) + + # test sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.schemes(), tuple(self.sample_1_schemes)) + self.assertEqual(ctx.schemes(resolve=True), tuple(self.sample_1_handlers)) + + # test sample 2 + ctx = CryptContext(**self.sample_2_dict) + self.assertEqual(ctx.schemes(), ()) + + def test_31_default_scheme(self): + "test default_scheme() method" + # NOTE: also checked under test_23 + + # test empty + ctx = CryptContext() + self.assertRaises(KeyError, ctx.default_scheme) + + # test sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.default_scheme(), "md5_crypt") + self.assertEqual(ctx.default_scheme(resolve=True), hash.md5_crypt) + + # test sample 2 + ctx = CryptContext(**self.sample_2_dict) + self.assertRaises(KeyError, ctx.default_scheme) + + # test defaults to first in scheme + ctx = CryptContext(schemes=self.sample_1_schemes) + self.assertEqual(ctx.default_scheme(), "des_crypt") + + # categories tested under test_23 + + def test_32_handler(self): + "test handler() method" + + # default for empty + ctx = CryptContext() + self.assertRaises(KeyError, ctx.handler) + + # default for sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.handler(), hash.md5_crypt) + + # by name + self.assertEqual(ctx.handler("des_crypt"), hash.des_crypt) + + # name not in schemes + self.assertRaises(KeyError, ctx.handler, "mysql323") + + # TODO: per-category + + def test_33_options(self): + "test internal _get_record_options() method" + def options(ctx, scheme, category=None): + return ctx._get_record_options(scheme, category)[0] + + # this checks that (3 schemes, 3 categories) inherit options correctly. + # the 'user' category is not present in the options. + cc4 = CryptContext( + schemes = [ "sha512_crypt", "des_crypt", "bsdi_crypt"], + deprecated = ["sha512_crypt", "des_crypt"], + all__vary_rounds = 0.1, + bsdi_crypt__vary_rounds=0.2, + sha512_crypt__max_rounds = 20000, + admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], + admin__all__vary_rounds = 0.05, + admin__bsdi_crypt__vary_rounds=0.3, + admin__sha512_crypt__max_rounds = 40000, + ) + self.assertEqual(cc4._categories, ("admin",)) + + # + # sha512_crypt + # + self.assertEqual(options(cc4, "sha512_crypt"), dict( + deprecated=True, + vary_rounds=0.1, # inherited from all__ max_rounds=20000, )) - self.assertEqual(p4.get_options("sha512_crypt", "user"), dict( - vary_rounds="10%", + self.assertEqual(options(cc4, "sha512_crypt", "user"), dict( + deprecated=True, # unconfigured category inherits from default + vary_rounds=0.1, max_rounds=20000, )) - self.assertEqual(p4.get_options("sha512_crypt", "admin"), dict( - vary_rounds="5%", - max_rounds=40000, + self.assertEqual(options(cc4, "sha512_crypt", "admin"), dict( + # NOT deprecated - context option overridden per-category + vary_rounds=0.05, # global overridden per-cateogry + max_rounds=40000, # overridden per-category )) - def test_14_handler_is_deprecated(self): - "test handler_is_deprecated() method" - pa = CryptPolicy(**self.sample_config_1pd) - pb = CryptPolicy(**self.sample_config_5pd) - - self.assertFalse(pa.handler_is_deprecated("des_crypt")) - self.assertFalse(pa.handler_is_deprecated(hash.bsdi_crypt)) - self.assertFalse(pa.handler_is_deprecated("sha512_crypt")) - - self.assertTrue(pb.handler_is_deprecated("des_crypt")) - self.assertFalse(pb.handler_is_deprecated(hash.bsdi_crypt)) - self.assertFalse(pb.handler_is_deprecated("sha512_crypt")) - - #check categories as well - self.assertTrue(pb.handler_is_deprecated("des_crypt", "user")) - self.assertFalse(pb.handler_is_deprecated("bsdi_crypt", "user")) - self.assertTrue(pb.handler_is_deprecated("des_crypt", "admin")) - self.assertTrue(pb.handler_is_deprecated("bsdi_crypt", "admin")) - - # check deprecation is overridden per category - pc = CryptPolicy( - schemes=["md5_crypt", "des_crypt"], - deprecated=["md5_crypt"], - user__context__deprecated=["des_crypt"], - ) - self.assertTrue(pc.handler_is_deprecated("md5_crypt")) - self.assertFalse(pc.handler_is_deprecated("des_crypt")) - self.assertFalse(pc.handler_is_deprecated("md5_crypt", "user")) - self.assertTrue(pc.handler_is_deprecated("des_crypt", "user")) + # + # des_crypt + # + self.assertEqual(options(cc4, "des_crypt"), dict( + deprecated=True, + vary_rounds=0.1, + )) - def test_15_min_verify_time(self): - "test get_min_verify_time() method" - # silence deprecation warnings for min verify time - warnings.filterwarnings("ignore", category=DeprecationWarning) + self.assertEqual(options(cc4, "des_crypt", "user"), dict( + deprecated=True, # unconfigured category inherits from default + vary_rounds=0.1, + )) - pa = CryptPolicy() - self.assertEqual(pa.get_min_verify_time(), 0) - self.assertEqual(pa.get_min_verify_time('admin'), 0) + self.assertEqual(options(cc4, "des_crypt", "admin"), dict( + deprecated=True, # unchanged though overidden + vary_rounds=0.05, # global overridden per-cateogry + )) - pb = pa.replace(min_verify_time=.1) - self.assertEqual(pb.get_min_verify_time(), .1) - self.assertEqual(pb.get_min_verify_time('admin'), .1) + # + # bsdi_crypt + # + self.assertEqual(options(cc4, "bsdi_crypt"), dict( + vary_rounds=0.2, # overridden from all__vary_rounds + )) - pc = pa.replace(admin__context__min_verify_time=.2) - self.assertEqual(pc.get_min_verify_time(), 0) - self.assertEqual(pc.get_min_verify_time('admin'), .2) + self.assertEqual(options(cc4, "bsdi_crypt", "user"), dict( + vary_rounds=0.2, # unconfigured category inherits from default + )) - pd = pb.replace(admin__context__min_verify_time=.2) - self.assertEqual(pd.get_min_verify_time(), .1) - self.assertEqual(pd.get_min_verify_time('admin'), .2) + self.assertEqual(options(cc4, "bsdi_crypt", "admin"), dict( + vary_rounds=0.3, + deprecated=True, # deprecation set per-category + )) - #========================================================= - #serialization - #========================================================= - def test_20_iter_config(self): - "test iter_config() method" - p5 = CryptPolicy(**self.sample_config_5pd) - self.assertEqual(dict(p5.iter_config()), self.sample_config_5pd) - self.assertEqual(dict(p5.iter_config(resolve=True)), self.sample_config_5prd) - self.assertEqual(dict(p5.iter_config(ini=True)), self.sample_config_5pid) - - def test_21_to_dict(self): + def test_34_to_dict(self): "test to_dict() method" - p5 = CryptPolicy(**self.sample_config_5pd) - self.assertEqual(p5.to_dict(), self.sample_config_5pd) - self.assertEqual(p5.to_dict(resolve=True), self.sample_config_5prd) + # NOTE: this is tested all throughout this test case. + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + self.assertEqual(ctx.to_dict(resolve=True), self.sample_1_resolved_dict) - def test_22_to_string(self): + def test_35_to_string(self): "test to_string() method" - pa = CryptPolicy(**self.sample_config_5pd) - s = pa.to_string() #NOTE: can't compare string directly, ordering etc may not match - pb = CryptPolicy.from_string(s) - self.assertEqual(pb.to_dict(), self.sample_config_5pd) - #========================================================= - # - #========================================================= + # create ctx and serialize + ctx = CryptContext(**self.sample_1_dict) + dump = ctx.to_string() -#========================================================= -#CryptContext -#========================================================= -class CryptContextTest(TestCase): - "test CryptContext class" - descriptionPrefix = "CryptContext" + # check ctx->string returns canonical format. + # NOTE: ConfigParser for PY26 and earlier didn't use OrderedDict, + # so to_string() won't get order correct. + # so we skip this test. + import sys + if sys.version_info >= (2,7): + self.assertEqual(dump, self.sample_1_unicode) + + # check ctx->string->ctx->dict returns original + ctx2 = CryptContext.from_string(dump) + self.assertEqual(ctx2.to_dict(), self.sample_1_dict) + + # TODO: test other features, like the unmanaged handler warning. + # TODO: test compact mode, section #========================================================= - #constructor + # password hash api #========================================================= - def test_00_constructor(self): - "test constructor" - #create crypt context using handlers - cc = CryptContext([hash.md5_crypt, hash.bsdi_crypt, hash.des_crypt]) - c,b,a = cc.policy.iter_handlers() - self.assertIs(a, hash.des_crypt) - self.assertIs(b, hash.bsdi_crypt) - self.assertIs(c, hash.md5_crypt) - - #create context using names - cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) - c,b,a = cc.policy.iter_handlers() - self.assertIs(a, hash.des_crypt) - self.assertIs(b, hash.bsdi_crypt) - self.assertIs(c, hash.md5_crypt) - - #TODO: test policy & other options - - def test_01_replace(self): - "test replace()" - - cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) - self.assertIs(cc.policy.get_handler(), hash.md5_crypt) - - cc2 = cc.replace() - self.assertIsNot(cc2, cc) - self.assertIs(cc2.policy, cc.policy) - - cc3 = cc.replace(default="bsdi_crypt") - self.assertIsNot(cc3, cc) - self.assertIsNot(cc3.policy, cc.policy) - self.assertIs(cc3.policy.get_handler(), hash.bsdi_crypt) - - def test_02_no_handlers(self): - "test no handlers" - - #check constructor... - cc = CryptContext() - self.assertRaises(KeyError, cc.identify, 'hash', required=True) - self.assertRaises(KeyError, cc.encrypt, 'secret') - self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + nonstring_vectors = [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ] + + def test_40_basic(self): + "test basic encrypt/identify/verify functionality" + handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] + cc = CryptContext(handlers) + + #run through handlers + for crypt in handlers: + h = cc.encrypt("test", scheme=crypt.name) + self.assertEqual(cc.identify(h), crypt.name) + self.assertEqual(cc.identify(h, resolve=True), crypt) + self.assertTrue(cc.verify('test', h)) + self.assertTrue(not cc.verify('notest', h)) - #check updating policy after the fact... - cc = CryptContext(['md5_crypt']) - p = CryptPolicy(schemes=[]) - cc.policy = p + #test default + h = cc.encrypt("test") + self.assertEqual(cc.identify(h), "md5_crypt") - self.assertRaises(KeyError, cc.identify, 'hash', required=True) - self.assertRaises(KeyError, cc.encrypt, 'secret') - self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + #test genhash + h = cc.genhash('secret', cc.genconfig()) + self.assertEqual(cc.identify(h), 'md5_crypt') - #========================================================= - #policy adaptation - #========================================================= - sample_policy_1 = dict( - schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", - "sha256_crypt"], - deprecated = [ "des_crypt", ], - default = "sha256_crypt", - bsdi_crypt__max_rounds = 30, - bsdi_crypt__default_rounds = 25, - bsdi_crypt__vary_rounds = 0, - sha256_crypt__max_rounds = 3000, - sha256_crypt__min_rounds = 2000, - sha256_crypt__default_rounds = 3000, - phpass__ident = "H", - phpass__default_rounds = 7, - ) + h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') + self.assertEqual(cc.identify(h), 'md5_crypt') - def test_10_01_genconfig_settings(self): - "test genconfig() settings" - cc = CryptContext(policy=None, - schemes=["md5_crypt", "phpass"], + self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") + + def test_41_genconfig(self): + "test genconfig() method" + cc = CryptContext(schemes=["md5_crypt", "phpass"], phpass__ident="H", phpass__default_rounds=7, ) - # hash specific settings + # uses default scheme self.assertTrue(cc.genconfig().startswith("$1$")) - self.assertEqual( - cc.genconfig(scheme="phpass", salt='.'*8), - '$H$5........', - ) + + # override scheme + self.assertTrue(cc.genconfig(scheme="phpass").startswith("$H$5")) + + # override scheme & custom settings self.assertEqual( cc.genconfig(scheme="phpass", salt='.'*8, rounds=8, ident='P'), '$P$6........', ) - # unsupported hash settings should be rejected - self.assertRaises(KeyError, cc.replace, md5_crypt__ident="P") + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().genconfig) + + def test_42_genhash(self): + "test genhash() method" + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.genhash, 'secret', hash, **kwds) + + # .. but should accept None if default scheme lacks config string + cc = CryptContext(["mysql323"]) + self.assertIsInstance(cc.genhash("stub", None), str) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().genhash, 'secret', 'hash') + + def test_43_encrypt(self): + "test encrypt() method" + cc = CryptContext(**self.sample_4_dict) + + # hash specific settings + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8), + '$H$5........De04R5Egz0aq8Tf.1eVhY/', + ) + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), + '$P$5........De04R5Egz0aq8Tf.1eVhY/', + ) + + # NOTE: more thorough job of rounds limits done below. - def test_10_02_genconfig_rounds_limits(self): - "test genconfig() policy rounds limits" - cc = CryptContext(policy=None, - schemes=["sha256_crypt"], + # min rounds + with catch_warnings(record=True) as wlog: + self.assertEqual( + cc.encrypt("password", rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + self.assertEqual( + cc.encrypt("password", rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' + ) + self.consumeWarningList(wlog) + + # NOTE: max rounds, etc tested in genconfig() + + # make default > max throws error if attempted + self.assertRaises(ValueError, cc.copy, + sha256_crypt__default_rounds=4000) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.encrypt, secret, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().encrypt, 'secret') + + def test_44_identify(self): + "test identify() border cases" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers) + + #check unknown hash + self.assertEqual(cc.identify('$9$232323123$1287319827'), None) + self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.identify, hash, **kwds) + + # throws error without schemes + cc = CryptContext() + self.assertIs(cc.identify('hash'), None) + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + + def test_45_verify(self): + "test verify() scheme kwd" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers) + + h = hash.md5_crypt.encrypt("test") + + #check base verify + self.assertTrue(cc.verify("test", h)) + self.assertTrue(not cc.verify("notest", h)) + + #check verify using right alg + self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) + self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) + + #check verify using wrong alg + self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + h = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify, secret, h, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for h, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify, 'secret', h, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().verify, 'secret', 'hash') + + def test_46_hash_needs_update(self): + "test hash_needs_update() method" + cc = CryptContext(**self.sample_4_dict) + + #check deprecated scheme + self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) + self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) + + #check min rounds + self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) + self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) + + #check max rounds + self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) + self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().hash_needs_update, 'hash') + + def test_47_verify_and_update(self): + "test verify_and_update()" + cc = CryptContext(**self.sample_4_dict) + + #create some hashes + h1 = cc.encrypt("password", scheme="des_crypt") + h2 = cc.encrypt("password", scheme="sha256_crypt") + + #check bad password, deprecated hash + ok, new_hash = cc.verify_and_update("wrongpass", h1) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check bad password, good hash + ok, new_hash = cc.verify_and_update("wrongpass", h2) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check right password, deprecated hash + ok, new_hash = cc.verify_and_update("password", h1) + self.assertTrue(ok) + self.assertTrue(cc.identify(new_hash), "sha256_crypt") + + #check right password, good hash + ok, new_hash = cc.verify_and_update("password", h2) + self.assertTrue(ok) + self.assertIs(new_hash, None) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify_and_update, 'secret', hash, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().verify_and_update, 'secret', 'hash') + + #========================================================= + # rounds options + #========================================================= + # NOTE: the follow tests check how _CryptRecord handles + # the min/max/default/vary_rounds options, via the output of + # genconfig(). it's assumed encrypt() takes the same codepath. + + def test_50_rounds_limits(self): + "test rounds limits" + cc = CryptContext(schemes=["sha256_crypt"], all__min_rounds=2000, all__max_rounds=3000, all__default_rounds=2500, @@ -631,7 +1031,7 @@ class CryptContextTest(TestCase): with catch_warnings(record=True) as wlog: # set below handler min - c2 = cc.replace(all__min_rounds=500, all__max_rounds=None, + c2 = cc.copy(all__min_rounds=500, all__max_rounds=None, all__default_rounds=500) self.consumeWarningList(wlog, [PasslibConfigWarning]*2) self.assertEqual(c2.genconfig(salt="nacl"), "$5$rounds=1000$nacl$") @@ -661,7 +1061,7 @@ class CryptContextTest(TestCase): # max rounds with catch_warnings(record=True) as wlog: # set above handler max - c2 = cc.replace(all__max_rounds=int(1e9)+500, all__min_rounds=None, + c2 = cc.copy(all__max_rounds=int(1e9)+500, all__min_rounds=None, all__default_rounds=int(1e9)+500) self.consumeWarningList(wlog, [PasslibConfigWarning]*2) self.assertEqual(c2.genconfig(salt="nacl"), @@ -693,225 +1093,119 @@ class CryptContextTest(TestCase): self.assertEqual(cc.genconfig(salt="nacl"), '$5$rounds=2500$nacl$') # fallback default rounds - use handler's - c2 = cc.replace(all__default_rounds=None, all__max_rounds=50000) - self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=40000$nacl$') + df = hash.sha256_crypt.default_rounds + c2 = cc.copy(all__default_rounds=None, all__max_rounds=df<<1) + self.assertEqual(c2.genconfig(salt="nacl"), + '$5$rounds=%d$nacl$' % df) # fallback default rounds - use handler's, but clipped to max rounds - c2 = cc.replace(all__default_rounds=None, all__max_rounds=3000) + c2 = cc.copy(all__default_rounds=None, all__max_rounds=3000) self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=3000$nacl$') # TODO: test default falls back to mx / mn if handler has no default. #default rounds - out of bounds - self.assertRaises(ValueError, cc.replace, all__default_rounds=1999) - cc.policy.replace(all__default_rounds=2000) - cc.policy.replace(all__default_rounds=3000) - self.assertRaises(ValueError, cc.replace, all__default_rounds=3001) + self.assertRaises(ValueError, cc.copy, all__default_rounds=1999) + cc.copy(all__default_rounds=2000) + cc.copy(all__default_rounds=3000) + self.assertRaises(ValueError, cc.copy, all__default_rounds=3001) # invalid min/max bounds - c2 = CryptContext(policy=None, schemes=["sha256_crypt"]) - self.assertRaises(ValueError, c2.replace, all__min_rounds=-1) - self.assertRaises(ValueError, c2.replace, all__max_rounds=-1) - self.assertRaises(ValueError, c2.replace, all__min_rounds=2000, + c2 = CryptContext(schemes=["sha256_crypt"]) + self.assertRaises(ValueError, c2.copy, all__min_rounds=-1) + self.assertRaises(ValueError, c2.copy, all__max_rounds=-1) + self.assertRaises(ValueError, c2.copy, all__min_rounds=2000, all__max_rounds=1999) - def test_10_03_genconfig_linear_vary_rounds(self): - "test genconfig() linear vary rounds" - cc = CryptContext(policy=None, - schemes=["sha256_crypt"], + def test_51_linear_vary_rounds(self): + "test linear vary rounds" + cc = CryptContext(schemes=["sha256_crypt"], all__min_rounds=1995, all__max_rounds=2005, all__default_rounds=2000, ) # test negative - self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) - self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") - self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%") # test static - c2 = cc.replace(all__vary_rounds=0) + c2 = cc.copy(all__vary_rounds=0) self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) - c2 = cc.replace(all__vary_rounds="0%") + c2 = cc.copy(all__vary_rounds="0%") self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) # test absolute - c2 = cc.replace(all__vary_rounds=1) + c2 = cc.copy(all__vary_rounds=1) self.assert_rounds_range(c2, "sha256_crypt", 1999, 2001) - c2 = cc.replace(all__vary_rounds=100) + c2 = cc.copy(all__vary_rounds=100) self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) # test relative - c2 = cc.replace(all__vary_rounds="0.1%") + c2 = cc.copy(all__vary_rounds="0.1%") self.assert_rounds_range(c2, "sha256_crypt", 1998, 2002) - c2 = cc.replace(all__vary_rounds="100%") + c2 = cc.copy(all__vary_rounds="100%") self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) - def test_10_03_genconfig_log2_vary_rounds(self): - "test genconfig() log2 vary rounds" - cc = CryptContext(policy=None, - schemes=["bcrypt"], + def test_52_log2_vary_rounds(self): + "test log2 vary rounds" + cc = CryptContext(schemes=["bcrypt"], all__min_rounds=15, all__max_rounds=25, all__default_rounds=20, ) # test negative - self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) - self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") - self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%") # test static - c2 = cc.replace(all__vary_rounds=0) + c2 = cc.copy(all__vary_rounds=0) self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="0%") + c2 = cc.copy(all__vary_rounds="0%") self.assert_rounds_range(c2, "bcrypt", 20, 20) # test absolute - c2 = cc.replace(all__vary_rounds=1) + c2 = cc.copy(all__vary_rounds=1) self.assert_rounds_range(c2, "bcrypt", 19, 21) - c2 = cc.replace(all__vary_rounds=100) + c2 = cc.copy(all__vary_rounds=100) self.assert_rounds_range(c2, "bcrypt", 15, 25) # test relative - should shift over at 50% mark - c2 = cc.replace(all__vary_rounds="1%") + c2 = cc.copy(all__vary_rounds="1%") self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="49%") + c2 = cc.copy(all__vary_rounds="49%") self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="50%") + c2 = cc.copy(all__vary_rounds="50%") self.assert_rounds_range(c2, "bcrypt", 19, 20) - c2 = cc.replace(all__vary_rounds="100%") + c2 = cc.copy(all__vary_rounds="100%") self.assert_rounds_range(c2, "bcrypt", 15, 21) def assert_rounds_range(self, context, scheme, lower, upper): "helper to check vary_rounds covers specified range" # NOTE: this runs enough times the min and max *should* be hit, # though there's a faint chance it will randomly fail. - handler = context.policy.get_handler(scheme) + handler = context.handler(scheme) salt = handler.default_salt_chars[0:1] * handler.max_salt_size seen = set() for i in irange(300): h = context.genconfig(scheme, salt=salt) r = handler.from_string(h).rounds seen.add(r) - self.assertEqual(min(seen), lower, "vary_rounds lower bound:") - self.assertEqual(max(seen), upper, "vary_rounds upper bound:") - - def test_11_encrypt_settings(self): - "test encrypt() honors policy settings" - cc = CryptContext(**self.sample_policy_1) - - # hash specific settings - self.assertEqual( - cc.encrypt("password", scheme="phpass", salt='.'*8), - '$H$5........De04R5Egz0aq8Tf.1eVhY/', - ) - self.assertEqual( - cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), - '$P$5........De04R5Egz0aq8Tf.1eVhY/', - ) - - # NOTE: more thorough job of rounds limits done in genconfig() test, - # which is much cheaper, and shares the same codebase. - - # min rounds - with catch_warnings(record=True) as wlog: - self.assertEqual( - cc.encrypt("password", rounds=1999, salt="nacl"), - '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', - ) - self.consumeWarningList(wlog, PasslibConfigWarning) - - self.assertEqual( - cc.encrypt("password", rounds=2001, salt="nacl"), - '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' - ) - self.consumeWarningList(wlog) - - # max rounds, etc tested in genconfig() - - # make default > max throws error if attempted - self.assertRaises(ValueError, cc.replace, - sha256_crypt__default_rounds=4000) - - def test_12_hash_needs_update(self): - "test hash_needs_update() method" - cc = CryptContext(**self.sample_policy_1) - - #check deprecated scheme - self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) - self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) - - #check min rounds - self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) - self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) - - #check max rounds - self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) - self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + self.assertEqual(min(seen), lower, "vary_rounds had wrong lower limit:") + self.assertEqual(max(seen), upper, "vary_rounds had wrong upper limit:") #========================================================= - #identify + # feature tests #========================================================= - def test_20_basic(self): - "test basic encrypt/identify/verify functionality" - handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] - cc = CryptContext(handlers, policy=None) - - #run through handlers - for crypt in handlers: - h = cc.encrypt("test", scheme=crypt.name) - self.assertEqual(cc.identify(h), crypt.name) - self.assertEqual(cc.identify(h, resolve=True), crypt) - self.assertTrue(cc.verify('test', h)) - self.assertTrue(not cc.verify('notest', h)) - - #test default - h = cc.encrypt("test") - self.assertEqual(cc.identify(h), "md5_crypt") - - #test genhash - h = cc.genhash('secret', cc.genconfig()) - self.assertEqual(cc.identify(h), 'md5_crypt') - - h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') - self.assertEqual(cc.identify(h), 'md5_crypt') - - self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") - - def test_21_identify(self): - "test identify() border cases" - handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] - cc = CryptContext(handlers, policy=None) - - #check unknown hash - self.assertEqual(cc.identify('$9$232323123$1287319827'), None) - self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) - - def test_22_verify(self): - "test verify() scheme kwd" - handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] - cc = CryptContext(handlers, policy=None) - - h = hash.md5_crypt.encrypt("test") - - #check base verify - self.assertTrue(cc.verify("test", h)) - self.assertTrue(not cc.verify("notest", h)) - - #check verify using right alg - self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) - self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) - - #check verify using wrong alg - self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') - - def test_24_min_verify_time(self): + def test_60_min_verify_time(self): "test verify() honors min_verify_time" #NOTE: this whole test assumes time.sleep() and tick() # have better than 100ms accuracy - set via delta. @@ -967,111 +1261,10 @@ class CryptContextTest(TestCase): self.assertAlmostEqual(elapsed, max_delay, delta=delta) self.consumeWarningList(wlog, ".*verify exceeded min_verify_time") - def test_25_verify_and_update(self): - "test verify_and_update()" - cc = CryptContext(**self.sample_policy_1) - - #create some hashes - h1 = cc.encrypt("password", scheme="des_crypt") - h2 = cc.encrypt("password", scheme="sha256_crypt") - - #check bad password, deprecated hash - ok, new_hash = cc.verify_and_update("wrongpass", h1) - self.assertFalse(ok) - self.assertIs(new_hash, None) - - #check bad password, good hash - ok, new_hash = cc.verify_and_update("wrongpass", h2) - self.assertFalse(ok) - self.assertIs(new_hash, None) - - #check right password, deprecated hash - ok, new_hash = cc.verify_and_update("password", h1) - self.assertTrue(ok) - self.assertTrue(cc.identify(new_hash), "sha256_crypt") - - #check right password, good hash - ok, new_hash = cc.verify_and_update("password", h2) - self.assertTrue(ok) - self.assertIs(new_hash, None) - - #========================================================= - # border cases - #========================================================= - def test_30_nonstring_hash(self): - "test non-string hash values cause error" - # - # test hash=None or some other non-string causes TypeError - # and that explicit-scheme code path behaves the same. - # - cc = CryptContext(["des_crypt"]) - for hash, kwds in [ - (None, {}), - (None, {"scheme": "des_crypt"}), - (1, {}), - ((), {}), - ]: - - self.assertRaises(TypeError, cc.identify, hash, **kwds) - self.assertRaises(TypeError, cc.genhash, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.verify, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.verify_and_update, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) - - # - # but genhash *should* accept None if default scheme lacks config string. - # - cc2 = CryptContext(["mysql323"]) - self.assertRaises(TypeError, cc2.identify, None) - self.assertIsInstance(cc2.genhash("stub", None), str) - self.assertRaises(TypeError, cc2.verify, 'stub', None) - self.assertRaises(TypeError, cc2.verify_and_update, 'stub', None) - self.assertRaises(TypeError, cc2.hash_needs_update, None) - - - def test_31_nonstring_secret(self): - "test non-string password values cause error" - cc = CryptContext(["des_crypt"]) - hash = cc.encrypt("stub") - # - # test secret=None, or some other non-string causes TypeError - # - for secret, kwds in [ - (None, {}), - (None, {"scheme": "des_crypt"}), - (1, {}), - ((), {}), - ]: - self.assertRaises(TypeError, cc.encrypt, secret, **kwds) - self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) - self.assertRaises(TypeError, cc.verify, secret, hash, **kwds) - self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) - - #========================================================= - # other - #========================================================= - def test_90_bcrypt_normhash(self): - "teset verify_and_update / hash_needs_update corrects bcrypt padding" - # see issue 25. - bcrypt = hash.bcrypt - - PASS1 = "loppux" - BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" - GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" - ctx = CryptContext(["bcrypt"]) - - with catch_warnings(record=True) as wlog: - self.assertTrue(ctx.hash_needs_update(BAD1)) - self.assertFalse(ctx.hash_needs_update(GOOD1)) - - if bcrypt.has_backend(): - self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) - self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) - res = ctx.verify_and_update(PASS1, BAD1) - self.assertTrue(res[0] and res[1] and res[1] != BAD1) - - def test_91_passprep(self): + def test_61_passprep(self): "test passprep option" + self.require_stringprep() + # saslprep should normalize pu -> pn pu = u("a\u0300") # unnormalized unicode pn = u("\u00E0") # normalized unicode @@ -1126,8 +1319,45 @@ class CryptContextTest(TestCase): self.assertFalse(ctx.verify(pu, ctx.encrypt(pn, scheme="md5_crypt"))) self.assertTrue(ctx.verify(pu, ctx.encrypt(pn, scheme="sha256_crypt"))) + def test_62_bcrypt_update(self): + "test verify_and_update / hash_needs_update corrects bcrypt padding" + # see issue 25. + bcrypt = hash.bcrypt + + PASS1 = "loppux" + BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + ctx = CryptContext(["bcrypt"]) + + with catch_warnings(record=True) as wlog: + self.assertTrue(ctx.hash_needs_update(BAD1)) + self.assertFalse(ctx.hash_needs_update(GOOD1)) + + if bcrypt.has_backend(): + self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) + self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) + ok, new_hash = ctx.verify_and_update(PASS1, BAD1) + self.assertTrue(ok) + self.assertTrue(new_hash and new_hash != BAD1) + + def test_63_bsdi_crypt_update(self): + "test verify_and_update / hash_needs_update correct bsdi even rounds" + even_hash = '_Y/../cG0zkJa6LY6k4c' + odd_hash = '_Z/..TgFg0/ptQtpAgws' + secret = 'test' + ctx = CryptContext(['bsdi_crypt']) + + self.assertTrue(ctx.hash_needs_update(even_hash)) + self.assertFalse(ctx.hash_needs_update(odd_hash)) + + self.assertEqual(ctx.verify_and_update(secret, odd_hash), (True,None)) + self.assertEqual(ctx.verify_and_update("x", even_hash), (False,None)) + ok, new_hash = ctx.verify_and_update(secret, even_hash) + self.assertTrue(ok) + self.assertTrue(new_hash and new_hash != even_hash) + #========================================================= - #eoc + # eoc #========================================================= #========================================================= @@ -1153,44 +1383,25 @@ class LazyCryptContextTest(TestCase): self.assertFalse(has_crypt_handler("dummy_2", True)) - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt")) + self.assertTrue(cc._is_deprecated_scheme("des_crypt")) self.assertTrue(has_crypt_handler("dummy_2", True)) def test_callable_constructor(self): - "test create_policy() hook, returning CryptPolicy" - self.assertFalse(has_crypt_handler("dummy_2")) - register_crypt_handler_path("dummy_2", "passlib.tests.test_context") - - def create_policy(flag=False): - self.assertTrue(flag) - return CryptPolicy(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) - - cc = LazyCryptContext(create_policy=create_policy, flag=True) - - self.assertFalse(has_crypt_handler("dummy_2", True)) - - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) - - self.assertTrue(has_crypt_handler("dummy_2", True)) - - def test_callable_constructor2(self): - "test create_policy() hook, returning dict" self.assertFalse(has_crypt_handler("dummy_2")) register_crypt_handler_path("dummy_2", "passlib.tests.test_context") - def create_policy(flag=False): + def onload(flag=False): self.assertTrue(flag) return dict(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) - cc = LazyCryptContext(create_policy=create_policy, flag=True) + cc = LazyCryptContext(onload=onload, flag=True) self.assertFalse(has_crypt_handler("dummy_2", True)) - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt")) + self.assertTrue(cc._is_deprecated_scheme("des_crypt")) self.assertTrue(has_crypt_handler("dummy_2", True)) diff --git a/passlib/tests/test_context_deprecated.py b/passlib/tests/test_context_deprecated.py new file mode 100644 index 0000000..f6d33d8 --- /dev/null +++ b/passlib/tests/test_context_deprecated.py @@ -0,0 +1,1165 @@ +"""tests for passlib.context + +this file is a clone of the 1.5 test_context.py, +containing the tests using the legacy CryptPolicy api. +it's being preserved here to ensure the old api doesn't break +(until Passlib 1.8, when this and the legacy api will be removed). +""" +#========================================================= +#imports +#========================================================= +from __future__ import with_statement +#core +import hashlib +from logging import getLogger +import os +import time +import warnings +import sys +#site +try: + from pkg_resources import resource_filename +except ImportError: + resource_filename = None +#pkg +from passlib import hash +from passlib.context import CryptContext, CryptPolicy, LazyCryptContext +from passlib.exc import PasslibConfigWarning +from passlib.utils import tick, to_bytes, to_unicode +from passlib.utils.compat import irange, u +import passlib.utils.handlers as uh +from passlib.tests.utils import TestCase, mktemp, catch_warnings, \ + gae_env, set_file +from passlib.registry import register_crypt_handler_path, has_crypt_handler, \ + _unload_handler_name as unload_handler_name +#module +log = getLogger(__name__) + +#========================================================= +# +#========================================================= +class CryptPolicyTest(TestCase): + "test CryptPolicy object" + + #TODO: need to test user categories w/in all this + + descriptionPrefix = "CryptPolicy" + + #========================================================= + #sample crypt policies used for testing + #========================================================= + + #----------------------------------------------------- + #sample 1 - average config file + #----------------------------------------------------- + #NOTE: copy of this is stored in file passlib/tests/sample_config_1s.cfg + sample_config_1s = """\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +all.vary_rounds = 10%% +bsdi_crypt.max_rounds = 30000 +bsdi_crypt.default_rounds = 25000 +sha512_crypt.max_rounds = 50000 +sha512_crypt.min_rounds = 40000 +""" + sample_config_1s_path = os.path.abspath(os.path.join( + os.path.dirname(__file__), "sample_config_1s.cfg")) + if not os.path.exists(sample_config_1s_path) and resource_filename: + #in case we're zipped up in an egg. + sample_config_1s_path = resource_filename("passlib.tests", + "sample_config_1s.cfg") + + #make sure sample_config_1s uses \n linesep - tests rely on this + assert sample_config_1s.startswith("[passlib]\nschemes") + + sample_config_1pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__max_rounds = 30000, + bsdi_crypt__default_rounds = 25000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds = 40000, + ) + + sample_config_1pid = { + "schemes": "des_crypt, md5_crypt, bsdi_crypt, sha512_crypt", + "default": "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + "all.vary_rounds": 0.1, + "bsdi_crypt.max_rounds": 30000, + "bsdi_crypt.default_rounds": 25000, + "sha512_crypt.max_rounds": 50000, + "sha512_crypt.min_rounds": 40000, + } + + sample_config_1prd = dict( + schemes = [ hash.des_crypt, hash.md5_crypt, hash.bsdi_crypt, hash.sha512_crypt], + default = "md5_crypt", # NOTE: passlib <= 1.5 was handler obj. + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__max_rounds = 30000, + bsdi_crypt__default_rounds = 25000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds = 40000, + ) + + #----------------------------------------------------- + #sample 2 - partial policy & result of overlay on sample 1 + #----------------------------------------------------- + sample_config_2s = """\ +[passlib] +bsdi_crypt.min_rounds = 29000 +bsdi_crypt.max_rounds = 35000 +bsdi_crypt.default_rounds = 31000 +sha512_crypt.min_rounds = 45000 +""" + + sample_config_2pd = dict( + #using this to test full replacement of existing options + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + #using this to test partial replacement of existing options + sha512_crypt__min_rounds=45000, + ) + + sample_config_12pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds=45000, + ) + + #----------------------------------------------------- + #sample 3 - just changing default + #----------------------------------------------------- + sample_config_3pd = dict( + default="sha512_crypt", + ) + + sample_config_123pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "sha512_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds=45000, + ) + + #----------------------------------------------------- + #sample 4 - category specific + #----------------------------------------------------- + sample_config_4s = """ +[passlib] +schemes = sha512_crypt +all.vary_rounds = 10%% +default.sha512_crypt.max_rounds = 20000 +admin.all.vary_rounds = 5%% +admin.sha512_crypt.max_rounds = 40000 +""" + + sample_config_4pd = dict( + schemes = [ "sha512_crypt" ], + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + sha512_crypt__max_rounds = 20000, + # NOTE: not maintaining backwards compat for rendering to "5%" + admin__all__vary_rounds = 0.05, + admin__sha512_crypt__max_rounds = 40000, + ) + + #----------------------------------------------------- + #sample 5 - to_string & deprecation testing + #----------------------------------------------------- + sample_config_5s = sample_config_1s + """\ +deprecated = des_crypt +admin__context__deprecated = des_crypt, bsdi_crypt +""" + + sample_config_5pd = sample_config_1pd.copy() + sample_config_5pd.update( + deprecated = [ "des_crypt" ], + admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], + ) + + sample_config_5pid = sample_config_1pid.copy() + sample_config_5pid.update({ + "deprecated": "des_crypt", + "admin.context.deprecated": "des_crypt, bsdi_crypt", + }) + + sample_config_5prd = sample_config_1prd.copy() + sample_config_5prd.update({ + # XXX: should deprecated return the actual handlers in this case? + # would have to modify how policy stores info, for one. + "deprecated": ["des_crypt"], + "admin__context__deprecated": ["des_crypt", "bsdi_crypt"], + }) + + #========================================================= + #constructors + #========================================================= + def setUp(self): + TestCase.setUp(self) + warnings.filterwarnings("ignore", + r"The CryptPolicy class has been deprecated") + + def test_00_constructor(self): + "test CryptPolicy() constructor" + policy = CryptPolicy(**self.sample_config_1pd) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #check key with too many separators is rejected + self.assertRaises(TypeError, CryptPolicy, + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + bad__key__bsdi_crypt__max_rounds = 30000, + ) + + #check nameless handler rejected + class nameless(uh.StaticHandler): + name = None + self.assertRaises(ValueError, CryptPolicy, schemes=[nameless]) + + # check scheme must be name or crypt handler + self.assertRaises(TypeError, CryptPolicy, schemes=[uh.StaticHandler]) + + #check name conflicts are rejected + class dummy_1(uh.StaticHandler): + name = 'dummy_1' + self.assertRaises(KeyError, CryptPolicy, schemes=[dummy_1, dummy_1]) + + #with unknown deprecated value + self.assertRaises(KeyError, CryptPolicy, + schemes=['des_crypt'], + deprecated=['md5_crypt']) + + #with unknown default value + self.assertRaises(KeyError, CryptPolicy, + schemes=['des_crypt'], + default='md5_crypt') + + def test_01_from_path_simple(self): + "test CryptPolicy.from_path() constructor" + #NOTE: this is separate so it can also run under GAE + + #test preset stored in existing file + path = self.sample_config_1s_path + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test if path missing + self.assertRaises(EnvironmentError, CryptPolicy.from_path, path + 'xxx') + + def test_01_from_path(self): + "test CryptPolicy.from_path() constructor with encodings" + if gae_env: + return self.skipTest("GAE doesn't offer read/write filesystem access") + + path = mktemp() + + #test "\n" linesep + set_file(path, self.sample_config_1s) + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test "\r\n" linesep + set_file(path, self.sample_config_1s.replace("\n","\r\n")) + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with custom encoding + uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") + set_file(path, uc2) + policy = CryptPolicy.from_path(path, encoding="utf-16") + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + def test_02_from_string(self): + "test CryptPolicy.from_string() constructor" + #test "\n" linesep + policy = CryptPolicy.from_string(self.sample_config_1s) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test "\r\n" linesep + policy = CryptPolicy.from_string( + self.sample_config_1s.replace("\n","\r\n")) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with unicode + data = to_unicode(self.sample_config_1s) + policy = CryptPolicy.from_string(data) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with non-ascii-compatible encoding + uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") + policy = CryptPolicy.from_string(uc2, encoding="utf-16") + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test category specific options + policy = CryptPolicy.from_string(self.sample_config_4s) + self.assertEqual(policy.to_dict(), self.sample_config_4pd) + + def test_03_from_source(self): + "test CryptPolicy.from_source() constructor" + #pass it a path + policy = CryptPolicy.from_source(self.sample_config_1s_path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it a string + policy = CryptPolicy.from_source(self.sample_config_1s) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it a dict (NOTE: make a copy to detect in-place modifications) + policy = CryptPolicy.from_source(self.sample_config_1pd.copy()) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it existing policy + p2 = CryptPolicy.from_source(policy) + self.assertIs(policy, p2) + + #pass it something wrong + self.assertRaises(TypeError, CryptPolicy.from_source, 1) + self.assertRaises(TypeError, CryptPolicy.from_source, []) + + def test_04_from_sources(self): + "test CryptPolicy.from_sources() constructor" + + #pass it empty list + self.assertRaises(ValueError, CryptPolicy.from_sources, []) + + #pass it one-element list + policy = CryptPolicy.from_sources([self.sample_config_1s]) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass multiple sources + policy = CryptPolicy.from_sources( + [ + self.sample_config_1s_path, + self.sample_config_2s, + self.sample_config_3pd, + ]) + self.assertEqual(policy.to_dict(), self.sample_config_123pd) + + def test_05_replace(self): + "test CryptPolicy.replace() constructor" + + p1 = CryptPolicy(**self.sample_config_1pd) + + #check overlaying sample 2 + p2 = p1.replace(**self.sample_config_2pd) + self.assertEqual(p2.to_dict(), self.sample_config_12pd) + + #check repeating overlay makes no change + p2b = p2.replace(**self.sample_config_2pd) + self.assertEqual(p2b.to_dict(), self.sample_config_12pd) + + #check overlaying sample 3 + p3 = p2.replace(self.sample_config_3pd) + self.assertEqual(p3.to_dict(), self.sample_config_123pd) + + def test_06_forbidden(self): + "test CryptPolicy() forbidden kwds" + + #salt not allowed to be set + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + des_crypt__salt="xx", + ) + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + all__salt="xx", + ) + + #schemes not allowed for category + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + user__context__schemes=["md5_crypt"], + ) + + #========================================================= + #reading + #========================================================= + def test_10_has_schemes(self): + "test has_schemes() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + self.assertTrue(p1.has_schemes()) + + p3 = CryptPolicy(**self.sample_config_3pd) + self.assertTrue(not p3.has_schemes()) + + def test_11_iter_handlers(self): + "test iter_handlers() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + s = self.sample_config_1prd['schemes'] + self.assertEqual(list(p1.iter_handlers()), s) + + p3 = CryptPolicy(**self.sample_config_3pd) + self.assertEqual(list(p3.iter_handlers()), []) + + def test_12_get_handler(self): + "test get_handler() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + + #check by name + self.assertIs(p1.get_handler("bsdi_crypt"), hash.bsdi_crypt) + + #check by missing name + self.assertIs(p1.get_handler("sha256_crypt"), None) + self.assertRaises(KeyError, p1.get_handler, "sha256_crypt", required=True) + + #check default + self.assertIs(p1.get_handler(), hash.md5_crypt) + + def test_13_get_options(self): + "test get_options() method" + + p12 = CryptPolicy(**self.sample_config_12pd) + + self.assertEqual(p12.get_options("bsdi_crypt"),dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds = 0.1, + min_rounds = 29000, + max_rounds = 35000, + default_rounds = 31000, + )) + + self.assertEqual(p12.get_options("sha512_crypt"),dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds = 0.1, + min_rounds = 45000, + max_rounds = 50000, + )) + + p4 = CryptPolicy.from_string(self.sample_config_4s) + self.assertEqual(p4.get_options("sha512_crypt"), dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds=0.1, + max_rounds=20000, + )) + + self.assertEqual(p4.get_options("sha512_crypt", "user"), dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds=0.1, + max_rounds=20000, + )) + + self.assertEqual(p4.get_options("sha512_crypt", "admin"), dict( + # NOTE: not maintaining backwards compat for rendering to "5%" + vary_rounds=0.05, + max_rounds=40000, + )) + + def test_14_handler_is_deprecated(self): + "test handler_is_deprecated() method" + pa = CryptPolicy(**self.sample_config_1pd) + pb = CryptPolicy(**self.sample_config_5pd) + + self.assertFalse(pa.handler_is_deprecated("des_crypt")) + self.assertFalse(pa.handler_is_deprecated(hash.bsdi_crypt)) + self.assertFalse(pa.handler_is_deprecated("sha512_crypt")) + + self.assertTrue(pb.handler_is_deprecated("des_crypt")) + self.assertFalse(pb.handler_is_deprecated(hash.bsdi_crypt)) + self.assertFalse(pb.handler_is_deprecated("sha512_crypt")) + + #check categories as well + self.assertTrue(pb.handler_is_deprecated("des_crypt", "user")) + self.assertFalse(pb.handler_is_deprecated("bsdi_crypt", "user")) + self.assertTrue(pb.handler_is_deprecated("des_crypt", "admin")) + self.assertTrue(pb.handler_is_deprecated("bsdi_crypt", "admin")) + + # check deprecation is overridden per category + pc = CryptPolicy( + schemes=["md5_crypt", "des_crypt"], + deprecated=["md5_crypt"], + user__context__deprecated=["des_crypt"], + ) + self.assertTrue(pc.handler_is_deprecated("md5_crypt")) + self.assertFalse(pc.handler_is_deprecated("des_crypt")) + self.assertFalse(pc.handler_is_deprecated("md5_crypt", "user")) + self.assertTrue(pc.handler_is_deprecated("des_crypt", "user")) + + def test_15_min_verify_time(self): + "test get_min_verify_time() method" + # silence deprecation warnings for min verify time + warnings.filterwarnings("ignore", category=DeprecationWarning) + + pa = CryptPolicy() + self.assertEqual(pa.get_min_verify_time(), 0) + self.assertEqual(pa.get_min_verify_time('admin'), 0) + + pb = pa.replace(min_verify_time=.1) + self.assertEqual(pb.get_min_verify_time(), .1) + self.assertEqual(pb.get_min_verify_time('admin'), .1) + + pc = pa.replace(admin__context__min_verify_time=.2) + self.assertEqual(pc.get_min_verify_time(), 0) + self.assertEqual(pc.get_min_verify_time('admin'), .2) + + pd = pb.replace(admin__context__min_verify_time=.2) + self.assertEqual(pd.get_min_verify_time(), .1) + self.assertEqual(pd.get_min_verify_time('admin'), .2) + + #========================================================= + #serialization + #========================================================= + def test_20_iter_config(self): + "test iter_config() method" + p5 = CryptPolicy(**self.sample_config_5pd) + self.assertEqual(dict(p5.iter_config()), self.sample_config_5pd) + self.assertEqual(dict(p5.iter_config(resolve=True)), self.sample_config_5prd) + self.assertEqual(dict(p5.iter_config(ini=True)), self.sample_config_5pid) + + def test_21_to_dict(self): + "test to_dict() method" + p5 = CryptPolicy(**self.sample_config_5pd) + self.assertEqual(p5.to_dict(), self.sample_config_5pd) + self.assertEqual(p5.to_dict(resolve=True), self.sample_config_5prd) + + def test_22_to_string(self): + "test to_string() method" + pa = CryptPolicy(**self.sample_config_5pd) + s = pa.to_string() #NOTE: can't compare string directly, ordering etc may not match + pb = CryptPolicy.from_string(s) + self.assertEqual(pb.to_dict(), self.sample_config_5pd) + + #========================================================= + # + #========================================================= + +#========================================================= +#CryptContext +#========================================================= +class CryptContextTest(TestCase): + "test CryptContext class" + descriptionPrefix = "CryptContext" + + def setUp(self): + TestCase.setUp(self) + warnings.filterwarnings("ignore", + r"CryptContext\(\)\.replace\(\) has been deprecated.*") + warnings.filterwarnings("ignore", + r"The CryptContext ``policy`` keyword has been deprecated.*") + warnings.filterwarnings("ignore", ".*(CryptPolicy|context\.policy).*(has|have) been deprecated.*") + + #========================================================= + #constructor + #========================================================= + def test_00_constructor(self): + "test constructor" + #create crypt context using handlers + cc = CryptContext([hash.md5_crypt, hash.bsdi_crypt, hash.des_crypt]) + c,b,a = cc.policy.iter_handlers() + self.assertIs(a, hash.des_crypt) + self.assertIs(b, hash.bsdi_crypt) + self.assertIs(c, hash.md5_crypt) + + #create context using names + cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) + c,b,a = cc.policy.iter_handlers() + self.assertIs(a, hash.des_crypt) + self.assertIs(b, hash.bsdi_crypt) + self.assertIs(c, hash.md5_crypt) + + #TODO: test policy & other options + + def test_01_replace(self): + "test replace()" + + cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) + self.assertIs(cc.policy.get_handler(), hash.md5_crypt) + + cc2 = cc.replace() + self.assertIsNot(cc2, cc) + # NOTE: was not able to maintain backward compatibility with this... + ##self.assertIs(cc2.policy, cc.policy) + + cc3 = cc.replace(default="bsdi_crypt") + self.assertIsNot(cc3, cc) + # NOTE: was not able to maintain backward compatibility with this... + ##self.assertIs(cc3.policy, cc.policy) + self.assertIs(cc3.policy.get_handler(), hash.bsdi_crypt) + + def test_02_no_handlers(self): + "test no handlers" + + #check constructor... + cc = CryptContext() + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + self.assertRaises(KeyError, cc.encrypt, 'secret') + self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + + #check updating policy after the fact... + cc = CryptContext(['md5_crypt']) + p = CryptPolicy(schemes=[]) + cc.policy = p + + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + self.assertRaises(KeyError, cc.encrypt, 'secret') + self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + + #========================================================= + #policy adaptation + #========================================================= + sample_policy_1 = dict( + schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", + "sha256_crypt"], + deprecated = [ "des_crypt", ], + default = "sha256_crypt", + bsdi_crypt__max_rounds = 30, + bsdi_crypt__default_rounds = 25, + bsdi_crypt__vary_rounds = 0, + sha256_crypt__max_rounds = 3000, + sha256_crypt__min_rounds = 2000, + sha256_crypt__default_rounds = 3000, + phpass__ident = "H", + phpass__default_rounds = 7, + ) + + def test_10_01_genconfig_settings(self): + "test genconfig() settings" + cc = CryptContext(policy=None, + schemes=["md5_crypt", "phpass"], + phpass__ident="H", + phpass__default_rounds=7, + ) + + # hash specific settings + self.assertTrue(cc.genconfig().startswith("$1$")) + self.assertEqual( + cc.genconfig(scheme="phpass", salt='.'*8), + '$H$5........', + ) + self.assertEqual( + cc.genconfig(scheme="phpass", salt='.'*8, rounds=8, ident='P'), + '$P$6........', + ) + + # unsupported hash settings should be rejected + self.assertRaises(KeyError, cc.replace, md5_crypt__ident="P") + + def test_10_02_genconfig_rounds_limits(self): + "test genconfig() policy rounds limits" + cc = CryptContext(policy=None, + schemes=["sha256_crypt"], + all__min_rounds=2000, + all__max_rounds=3000, + all__default_rounds=2500, + ) + + # min rounds + with catch_warnings(record=True) as wlog: + + # set below handler min + c2 = cc.replace(all__min_rounds=500, all__max_rounds=None, + all__default_rounds=500) + self.consumeWarningList(wlog, [PasslibConfigWarning]*2) + self.assertEqual(c2.genconfig(salt="nacl"), "$5$rounds=1000$nacl$") + self.consumeWarningList(wlog) + + # below + self.assertEqual( + cc.genconfig(rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + # equal + self.assertEqual( + cc.genconfig(rounds=2000, salt="nacl"), + '$5$rounds=2000$nacl$', + ) + self.consumeWarningList(wlog) + + # above + self.assertEqual( + cc.genconfig(rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$' + ) + self.consumeWarningList(wlog) + + # max rounds + with catch_warnings(record=True) as wlog: + # set above handler max + c2 = cc.replace(all__max_rounds=int(1e9)+500, all__min_rounds=None, + all__default_rounds=int(1e9)+500) + self.consumeWarningList(wlog, [PasslibConfigWarning]*2) + self.assertEqual(c2.genconfig(salt="nacl"), + "$5$rounds=999999999$nacl$") + self.consumeWarningList(wlog) + + # above + self.assertEqual( + cc.genconfig(rounds=3001, salt="nacl"), + '$5$rounds=3000$nacl$' + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + # equal + self.assertEqual( + cc.genconfig(rounds=3000, salt="nacl"), + '$5$rounds=3000$nacl$' + ) + self.consumeWarningList(wlog) + + # below + self.assertEqual( + cc.genconfig(rounds=2999, salt="nacl"), + '$5$rounds=2999$nacl$', + ) + self.consumeWarningList(wlog) + + # explicit default rounds + self.assertEqual(cc.genconfig(salt="nacl"), '$5$rounds=2500$nacl$') + + # fallback default rounds - use handler's default + df = hash.sha256_crypt.default_rounds + c2 = cc.copy(all__default_rounds=None, all__max_rounds=df<<1) + self.assertEqual(c2.genconfig(salt="nacl"), + '$5$rounds=%d$nacl$' % df) + + # fallback default rounds - use handler's, but clipped to max rounds + c2 = cc.replace(all__default_rounds=None, all__max_rounds=3000) + self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=3000$nacl$') + + # TODO: test default falls back to mx / mn if handler has no default. + + #default rounds - out of bounds + self.assertRaises(ValueError, cc.replace, all__default_rounds=1999) + cc.policy.replace(all__default_rounds=2000) + cc.policy.replace(all__default_rounds=3000) + self.assertRaises(ValueError, cc.replace, all__default_rounds=3001) + + # invalid min/max bounds + c2 = CryptContext(policy=None, schemes=["sha256_crypt"]) + self.assertRaises(ValueError, c2.replace, all__min_rounds=-1) + self.assertRaises(ValueError, c2.replace, all__max_rounds=-1) + self.assertRaises(ValueError, c2.replace, all__min_rounds=2000, + all__max_rounds=1999) + + def test_10_03_genconfig_linear_vary_rounds(self): + "test genconfig() linear vary rounds" + cc = CryptContext(policy=None, + schemes=["sha256_crypt"], + all__min_rounds=1995, + all__max_rounds=2005, + all__default_rounds=2000, + ) + + # test negative + self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + + # test static + c2 = cc.replace(all__vary_rounds=0) + self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) + + c2 = cc.replace(all__vary_rounds="0%") + self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) + + # test absolute + c2 = cc.replace(all__vary_rounds=1) + self.assert_rounds_range(c2, "sha256_crypt", 1999, 2001) + c2 = cc.replace(all__vary_rounds=100) + self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) + + # test relative + c2 = cc.replace(all__vary_rounds="0.1%") + self.assert_rounds_range(c2, "sha256_crypt", 1998, 2002) + c2 = cc.replace(all__vary_rounds="100%") + self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) + + def test_10_03_genconfig_log2_vary_rounds(self): + "test genconfig() log2 vary rounds" + cc = CryptContext(policy=None, + schemes=["bcrypt"], + all__min_rounds=15, + all__max_rounds=25, + all__default_rounds=20, + ) + + # test negative + self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + + # test static + c2 = cc.replace(all__vary_rounds=0) + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="0%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + # test absolute + c2 = cc.replace(all__vary_rounds=1) + self.assert_rounds_range(c2, "bcrypt", 19, 21) + c2 = cc.replace(all__vary_rounds=100) + self.assert_rounds_range(c2, "bcrypt", 15, 25) + + # test relative - should shift over at 50% mark + c2 = cc.replace(all__vary_rounds="1%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="49%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="50%") + self.assert_rounds_range(c2, "bcrypt", 19, 20) + + c2 = cc.replace(all__vary_rounds="100%") + self.assert_rounds_range(c2, "bcrypt", 15, 21) + + def assert_rounds_range(self, context, scheme, lower, upper): + "helper to check vary_rounds covers specified range" + # NOTE: this runs enough times the min and max *should* be hit, + # though there's a faint chance it will randomly fail. + handler = context.policy.get_handler(scheme) + salt = handler.default_salt_chars[0:1] * handler.max_salt_size + seen = set() + for i in irange(300): + h = context.genconfig(scheme, salt=salt) + r = handler.from_string(h).rounds + seen.add(r) + self.assertEqual(min(seen), lower, "vary_rounds had wrong lower limit:") + self.assertEqual(max(seen), upper, "vary_rounds had wrong upper limit:") + + def test_11_encrypt_settings(self): + "test encrypt() honors policy settings" + cc = CryptContext(**self.sample_policy_1) + + # hash specific settings + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8), + '$H$5........De04R5Egz0aq8Tf.1eVhY/', + ) + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), + '$P$5........De04R5Egz0aq8Tf.1eVhY/', + ) + + # NOTE: more thorough job of rounds limits done in genconfig() test, + # which is much cheaper, and shares the same codebase. + + # min rounds + with catch_warnings(record=True) as wlog: + self.assertEqual( + cc.encrypt("password", rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + self.assertEqual( + cc.encrypt("password", rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' + ) + self.consumeWarningList(wlog) + + # max rounds, etc tested in genconfig() + + # make default > max throws error if attempted + self.assertRaises(ValueError, cc.replace, + sha256_crypt__default_rounds=4000) + + def test_12_hash_needs_update(self): + "test hash_needs_update() method" + cc = CryptContext(**self.sample_policy_1) + + #check deprecated scheme + self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) + self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) + + #check min rounds + self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) + self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) + + #check max rounds + self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) + self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + + #========================================================= + #identify + #========================================================= + def test_20_basic(self): + "test basic encrypt/identify/verify functionality" + handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] + cc = CryptContext(handlers, policy=None) + + #run through handlers + for crypt in handlers: + h = cc.encrypt("test", scheme=crypt.name) + self.assertEqual(cc.identify(h), crypt.name) + self.assertEqual(cc.identify(h, resolve=True), crypt) + self.assertTrue(cc.verify('test', h)) + self.assertTrue(not cc.verify('notest', h)) + + #test default + h = cc.encrypt("test") + self.assertEqual(cc.identify(h), "md5_crypt") + + #test genhash + h = cc.genhash('secret', cc.genconfig()) + self.assertEqual(cc.identify(h), 'md5_crypt') + + h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') + self.assertEqual(cc.identify(h), 'md5_crypt') + + self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") + + def test_21_identify(self): + "test identify() border cases" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers, policy=None) + + #check unknown hash + self.assertEqual(cc.identify('$9$232323123$1287319827'), None) + self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) + + def test_22_verify(self): + "test verify() scheme kwd" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers, policy=None) + + h = hash.md5_crypt.encrypt("test") + + #check base verify + self.assertTrue(cc.verify("test", h)) + self.assertTrue(not cc.verify("notest", h)) + + #check verify using right alg + self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) + self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) + + #check verify using wrong alg + self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') + + def test_24_min_verify_time(self): + "test verify() honors min_verify_time" + #NOTE: this whole test assumes time.sleep() and tick() + # have better than 100ms accuracy - set via delta. + delta = .05 + min_delay = 2*delta + min_verify_time = 5*delta + max_delay = 8*delta + + class TimedHash(uh.StaticHandler): + "psuedo hash that takes specified amount of time" + name = "timed_hash" + delay = 0 + + @classmethod + def identify(cls, hash): + return True + + def _calc_checksum(self, secret): + time.sleep(self.delay) + return to_unicode(secret + 'x') + + # silence deprecation warnings for min verify time + with catch_warnings(record=True) as wlog: + cc = CryptContext([TimedHash], min_verify_time=min_verify_time) + self.consumeWarningList(wlog, DeprecationWarning) + + def timecall(func, *args, **kwds): + start = tick() + result = func(*args, **kwds) + end = tick() + return end-start, result + + #verify genhash delay works + TimedHash.delay = min_delay + elapsed, result = timecall(TimedHash.genhash, 'stub', None) + self.assertEqual(result, 'stubx') + self.assertAlmostEqual(elapsed, min_delay, delta=delta) + + #ensure min verify time is honored + elapsed, result = timecall(cc.verify, "stub", "stubx") + self.assertTrue(result) + self.assertAlmostEqual(elapsed, min_delay, delta=delta) + + elapsed, result = timecall(cc.verify, "blob", "stubx") + self.assertFalse(result) + self.assertAlmostEqual(elapsed, min_verify_time, delta=delta) + + #ensure taking longer emits a warning. + TimedHash.delay = max_delay + with catch_warnings(record=True) as wlog: + elapsed, result = timecall(cc.verify, "blob", "stubx") + self.assertFalse(result) + self.assertAlmostEqual(elapsed, max_delay, delta=delta) + self.consumeWarningList(wlog, ".*verify exceeded min_verify_time") + + def test_25_verify_and_update(self): + "test verify_and_update()" + cc = CryptContext(**self.sample_policy_1) + + #create some hashes + h1 = cc.encrypt("password", scheme="des_crypt") + h2 = cc.encrypt("password", scheme="sha256_crypt") + + #check bad password, deprecated hash + ok, new_hash = cc.verify_and_update("wrongpass", h1) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check bad password, good hash + ok, new_hash = cc.verify_and_update("wrongpass", h2) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check right password, deprecated hash + ok, new_hash = cc.verify_and_update("password", h1) + self.assertTrue(ok) + self.assertTrue(cc.identify(new_hash), "sha256_crypt") + + #check right password, good hash + ok, new_hash = cc.verify_and_update("password", h2) + self.assertTrue(ok) + self.assertIs(new_hash, None) + + #========================================================= + # border cases + #========================================================= + def test_30_nonstring_hash(self): + "test non-string hash values cause error" + # + # test hash=None or some other non-string causes TypeError + # and that explicit-scheme code path behaves the same. + # + cc = CryptContext(["des_crypt"]) + for hash, kwds in [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ]: + + self.assertRaises(TypeError, cc.identify, hash, **kwds) + self.assertRaises(TypeError, cc.genhash, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.verify, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.verify_and_update, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) + + # + # but genhash *should* accept None if default scheme lacks config string. + # + cc2 = CryptContext(["mysql323"]) + self.assertRaises(TypeError, cc2.identify, None) + self.assertIsInstance(cc2.genhash("stub", None), str) + self.assertRaises(TypeError, cc2.verify, 'stub', None) + self.assertRaises(TypeError, cc2.verify_and_update, 'stub', None) + self.assertRaises(TypeError, cc2.hash_needs_update, None) + + + def test_31_nonstring_secret(self): + "test non-string password values cause error" + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt("stub") + # + # test secret=None, or some other non-string causes TypeError + # + for secret, kwds in [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ]: + self.assertRaises(TypeError, cc.encrypt, secret, **kwds) + self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) + self.assertRaises(TypeError, cc.verify, secret, hash, **kwds) + self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) + + #========================================================= + # other + #========================================================= + def test_90_bcrypt_normhash(self): + "teset verify_and_update / hash_needs_update corrects bcrypt padding" + # see issue 25. + bcrypt = hash.bcrypt + + PASS1 = "loppux" + BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + ctx = CryptContext(["bcrypt"]) + + with catch_warnings(record=True) as wlog: + self.assertTrue(ctx.hash_needs_update(BAD1)) + self.assertFalse(ctx.hash_needs_update(GOOD1)) + + if bcrypt.has_backend(): + self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) + self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) + res = ctx.verify_and_update(PASS1, BAD1) + self.assertTrue(res[0] and res[1] and res[1] != BAD1) + + #========================================================= + #eoc + #========================================================= + +#========================================================= +#LazyCryptContext +#========================================================= +class dummy_2(uh.StaticHandler): + name = "dummy_2" + +class LazyCryptContextTest(TestCase): + descriptionPrefix = "LazyCryptContext" + + def setUp(self): + # make sure this isn't registered before OR after + unload_handler_name("dummy_2") + self.addCleanup(unload_handler_name, "dummy_2") + + # silence some warnings + warnings.filterwarnings("ignore", + r"CryptContext\(\)\.replace\(\) has been deprecated") + warnings.filterwarnings("ignore", ".*(CryptPolicy|context\.policy).*(has|have) been deprecated.*") + + def test_kwd_constructor(self): + "test plain kwds" + self.assertFalse(has_crypt_handler("dummy_2")) + register_crypt_handler_path("dummy_2", "passlib.tests.test_context") + + cc = LazyCryptContext(iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) + + self.assertFalse(has_crypt_handler("dummy_2", True)) + + self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) + self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + + self.assertTrue(has_crypt_handler("dummy_2", True)) + + def test_callable_constructor(self): + "test create_policy() hook, returning CryptPolicy" + self.assertFalse(has_crypt_handler("dummy_2")) + register_crypt_handler_path("dummy_2", "passlib.tests.test_context") + + def create_policy(flag=False): + self.assertTrue(flag) + return CryptPolicy(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) + + cc = LazyCryptContext(create_policy=create_policy, flag=True) + + self.assertFalse(has_crypt_handler("dummy_2", True)) + + self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) + self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + + self.assertTrue(has_crypt_handler("dummy_2", True)) + +#========================================================= +#EOF +#========================================================= diff --git a/passlib/tests/test_ext_django.py b/passlib/tests/test_ext_django.py index 890c87b..0a8764f 100644 --- a/passlib/tests/test_ext_django.py +++ b/passlib/tests/test_ext_django.py @@ -9,7 +9,7 @@ import sys import warnings #site #pkg -from passlib.context import CryptContext, CryptPolicy +from passlib.context import CryptContext from passlib.apps import django_context from passlib.ext.django import utils from passlib.hash import sha256_crypt @@ -118,9 +118,9 @@ sample1_sha1 = 'sha1$b215d$9ee0a66f84ef1ad99096355e788135f7e949bd41' # context for testing category funcs category_context = CryptContext( schemes = [ "sha256_crypt" ], - sha256_crypt__rounds = 1000, - staff__sha256_crypt__rounds = 2000, - superuser__sha256_crypt__rounds = 3000, + sha256_crypt__default_rounds = 1000, + staff__sha256_crypt__default_rounds = 2000, + superuser__sha256_crypt__default_rounds = 3000, ) def get_cc_rounds(**kwds): @@ -258,7 +258,6 @@ class PatchTest(TestCase): def test_01_patch_bad_types(self): "test set_django_password_context bad inputs" set = utils.set_django_password_context - self.assertRaises(TypeError, set, CryptPolicy()) self.assertRaises(TypeError, set, "") def test_02_models_check_password(self): @@ -430,85 +429,70 @@ class PluginTest(TestCase): descriptionPrefix = "passlib.ext.django plugin" def setUp(self): - #remove django patch + super(PluginTest, self).setUp() + + # remove django patch now, and at end utils.set_django_password_context(None) + self.addCleanup(utils.set_django_password_context, None) - #ensure django settings are empty + # ensure django settings are empty update_settings( PASSLIB_CONTEXT=_NOTSET, PASSLIB_GET_CATEGORY=_NOTSET, ) - #unload module so it's re-run + # unload module so it's re-run when imported sys.modules.pop("passlib.ext.django.models", None) - def tearDown(self): - #remove django patch - utils.set_django_password_context(None) + def check_hashes(self, tests, default_scheme, deprecated=[], load=True): + """run through django api to verify patch is configured & functioning""" + # load extension if it hasn't been already. + if load: + import passlib.ext.django.models - def check_hashes(self, tests, new_hash=None, deprecated=None): - u = FakeUser() - deprecated = None + # create fake user object + user = FakeUser() - # check new hash construction - if new_hash: - u.set_password("placeholder") - handler = get_crypt_handler(new_hash) - self.assertTrue(handler.identify(u.password)) + # check new hashes constructed using default scheme + user.set_password("stub") + handler = get_crypt_handler(default_scheme) + self.assertTrue(handler.identify(user.password), + "handler failed to identify hash: %r %r" % + (default_scheme, user.password)) # run against hashes from tests... for test in tests: for secret, hash in test.iter_known_hashes(): # check against valid password - u.password = hash + user.password = hash if has_django0 and isinstance(secret, unicode): secret = secret.encode("utf-8") - self.assertTrue(u.check_password(secret)) - if new_hash and deprecated and test.handler.name in deprecated: + self.assertTrue(user.check_password(secret)) + if deprecated and test.handler.name in deprecated: self.assertFalse(handler.identify(hash)) - self.assertTrue(handler.identify(u.password)) + self.assertTrue(handler.identify(user.password)) # check against invalid password - u.password = hash - self.assertFalse(u.check_password('x'+secret)) - if new_hash and deprecated and test.handler.name in deprecated: + user.password = hash + self.assertFalse(user.check_password('x'+secret)) + if deprecated and test.handler.name in deprecated: self.assertFalse(handler.identify(hash)) - self.assertEqual(u.password, hash) + self.assertEqual(user.password, hash) # check disabled handling if has_django1: - u.set_password(None) + user.set_password(None) handler = get_crypt_handler("django_disabled") - self.assertTrue(handler.identify(u.password)) - self.assertFalse(u.check_password('placeholder')) + self.assertTrue(handler.identify(user.password)) + self.assertFalse(user.check_password('placeholder')) - def test_00_actual_django(self): - "test actual Django behavior has not changed" - #NOTE: if this test fails, - # probably means newer version of Django, - # and passlib's policies should be updated. + def check_django_stock(self, load=True): self.check_hashes(django_hash_tests, "django_salted_sha1", - ["hex_md5"]) - - def test_01_explicit_unset(self, value=None): - "test PASSLIB_CONTEXT = None" - update_settings( - PASSLIB_CONTEXT=value, - ) - import passlib.ext.django.models - self.check_hashes(django_hash_tests, - "django_salted_sha1", - ["hex_md5"]) - - def test_02_stock_ctx(self): - "test PASSLIB_CONTEXT = utils.STOCK_CTX" - self.test_01_explicit_unset(value=utils.STOCK_CTX) + ["hex_md5"], load=load) - def test_03_implicit_default_ctx(self): - "test PASSLIB_CONTEXT unset" - import passlib.ext.django.models + def check_passlib_stock(self): self.check_hashes(default_hash_tests, "sha512_crypt", ["hex_md5", "django_salted_sha1", @@ -516,24 +500,46 @@ class PluginTest(TestCase): "django_des_crypt", ]) - def test_04_explicit_default_ctx(self): + def test_10_django(self): + "test actual Django behavior has not changed" + #NOTE: if this test fails, + # probably means newer version of Django, + # and passlib's policies should be updated. + self.check_django_stock(load=False) + + def test_11_none(self): + "test PASSLIB_CONTEXT=None" + update_settings(PASSLIB_CONTEXT=None) + self.check_django_stock(load=False) + + def test_12_string(self): + "test PASSLIB_CONTEXT=string" + update_settings(PASSLIB_CONTEXT=utils.STOCK_CTX) + self.check_django_stock(load=False) + + def test_13_unset(self): + "test unset PASSLIB_CONTEXT uses default" + self.check_passlib_stock() + + def test_14_default(self): "test PASSLIB_CONTEXT = utils.DEFAULT_CTX" - update_settings( - PASSLIB_CONTEXT=utils.DEFAULT_CTX, - ) - self.test_03_implicit_default_ctx() + update_settings(PASSLIB_CONTEXT=utils.DEFAULT_CTX) + self.check_passlib_stock() - def test_05_default_ctx_alias(self): + def test_15_default_alias(self): "test PASSLIB_CONTEXT = 'passlib-default'" - update_settings( - PASSLIB_CONTEXT="passlib-default", - ) - self.test_03_implicit_default_ctx() + update_settings(PASSLIB_CONTEXT="passlib-default") + self.check_passlib_stock() + + def test_16_invalid(self): + "test PASSLIB_CONTEXT = invalid type" + update_settings(PASSLIB_CONTEXT=123) + self.assertRaises(TypeError, __import__, 'passlib.ext.django.models') - def test_06_categories(self): + def test_20_categories(self): "test PASSLIB_GET_CATEGORY unset" update_settings( - PASSLIB_CONTEXT=category_context.policy.to_string(), + PASSLIB_CONTEXT=category_context.to_string(), ) import passlib.ext.django.models @@ -541,12 +547,12 @@ class PluginTest(TestCase): self.assertEqual(get_cc_rounds(is_staff=True), 2000) self.assertEqual(get_cc_rounds(is_superuser=True), 3000) - def test_07_categories_explicit(self): + def test_21_categories_explicit(self): "test PASSLIB_GET_CATEGORY = function" def get_category(user): return user.first_name or None update_settings( - PASSLIB_CONTEXT = category_context.policy.to_string(), + PASSLIB_CONTEXT = category_context.to_string(), PASSLIB_GET_CATEGORY = get_category, ) import passlib.ext.django.models @@ -556,10 +562,10 @@ class PluginTest(TestCase): self.assertEqual(get_cc_rounds(first_name='staff'), 2000) self.assertEqual(get_cc_rounds(first_name='superuser'), 3000) - def test_08_categories_disabled(self): + def test_22_categories_disabled(self): "test PASSLIB_GET_CATEGORY = None" update_settings( - PASSLIB_CONTEXT = category_context.policy.to_string(), + PASSLIB_CONTEXT = category_context.to_string(), PASSLIB_GET_CATEGORY = None, ) import passlib.ext.django.models diff --git a/passlib/tests/test_handlers.py b/passlib/tests/test_handlers.py index f00e1cf..61cdc8f 100644 --- a/passlib/tests/test_handlers.py +++ b/passlib/tests/test_handlers.py @@ -201,49 +201,55 @@ class _bcrypt_test(HandlerCase): #=============================================================== # fuzz testing #=============================================================== - def get_fuzz_verifiers(self): - verifiers = super(_bcrypt_test, self).get_fuzz_verifiers() - - # test other backends against py-bcrypt if available + def os_supports_ident(self, hash): + "check if OS crypt is expected to support given ident" + if hash is None: + return True + # most OSes won't support 2x/2y + # XXX: definitely not the BSDs, but what about the linux variants? + if hash.startswith("$2x$") or hash.startswith("$2y$"): + return False + return True + + def fuzz_verifier_pybcrypt(self): + # test against py-bcrypt if available from passlib.utils import to_native_str try: from bcrypt import hashpw except ImportError: - pass - else: - def check_pybcrypt(secret, hash): - "pybcrypt" - secret = to_native_str(secret, self.fuzz_password_encoding) - if hash.startswith("$2y$"): - hash = "$2a$" + hash[4:] - try: - return hashpw(secret, hash) == hash - except ValueError: - raise ValueError("py-bcrypt rejected hash: %r" % (hash,)) - verifiers.append(check_pybcrypt) - - # test other backends against bcryptor if available + return + def check_pybcrypt(secret, hash): + "pybcrypt" + secret = to_native_str(secret, self.fuzz_password_encoding) + if hash.startswith("$2y$"): + hash = "$2a$" + hash[4:] + try: + return hashpw(secret, hash) == hash + except ValueError: + raise ValueError("py-bcrypt rejected hash: %r" % (hash,)) + return check_pybcrypt + + def fuzz_verifier_bcryptor(self): + # test against bcryptor if available + from passlib.utils import to_native_str try: from bcryptor.engine import Engine except ImportError: - pass - else: - def check_bcryptor(secret, hash): - "bcryptor" - secret = to_native_str(secret, self.fuzz_password_encoding) - if hash.startswith("$2y$"): - hash = "$2a$" + hash[4:] - elif hash.startswith("$2$"): - # bcryptor doesn't support $2$ hashes; but we can fake it - # using the $2a$ algorithm, by repeating the password until - # it's 72 chars in length. - hash = "$2a$" + hash[3:] - if secret: - secret = repeat_string(secret, 72) - return Engine(False).hash_key(secret, hash) == hash - verifiers.append(check_bcryptor) - - return verifiers + return + def check_bcryptor(secret, hash): + "bcryptor" + secret = to_native_str(secret, self.fuzz_password_encoding) + if hash.startswith("$2y$"): + hash = "$2a$" + hash[4:] + elif hash.startswith("$2$"): + # bcryptor doesn't support $2$ hashes; but we can fake it + # using the $2a$ algorithm, by repeating the password until + # it's 72 chars in length. + hash = "$2a$" + hash[3:] + if secret: + secret = repeat_string(secret, 72) + return Engine(False).hash_key(secret, hash) == hash + return check_bcryptor def get_fuzz_rounds(self): # decrease default rounds for fuzz testing to speed up volume. @@ -254,6 +260,8 @@ class _bcrypt_test(HandlerCase): if ident == u("$2x$"): # just recognized, not currently supported. return None + if self.backend == "os_crypt" and not self.using_patched_crypt and not self.os_supports_ident(ident): + return None return ident #=============================================================== @@ -421,13 +429,17 @@ class _bsdi_crypt_test(HandlerCase): platform_crypt_support = dict( freebsd=True, - openbsd=False, + openbsd=True, netbsd=True, linux=False, solaris=False, # darwin ? ) + def setUp(self): + super(_bsdi_crypt_test, self).setUp() + warnings.filterwarnings("ignore", "bsdi_crypt rounds should be odd.*") + os_crypt_bsdi_crypt_test = create_backend_case(_bsdi_crypt_test, "os_crypt") builtin_bsdi_crypt_test = create_backend_case(_bsdi_crypt_test, "builtin") @@ -497,6 +509,7 @@ class cisco_pix_test(UserHandlerMixin, HandlerCase): class cisco_type7_test(HandlerCase): handler = hash.cisco_type7 salt_bits = 4 + salt_type = int known_correct_hashes = [ # @@ -539,6 +552,41 @@ class cisco_type7_test(HandlerCase): (UPASS_TABLE, '0958EDC8A9F495F6F8A5FD'), ] + known_unidentified_hashes = [ + # salt with hex value + "0A480E051A33490E", + + # salt value > 52. this may in fact be valid, but we reject it for now + # (see docs for more). + '99400E4812', + ] + + def test_90_decode(self): + "test cisco_type7.decode()" + from passlib.utils import to_unicode, to_bytes + + handler = self.handler + for secret, hash in self.known_correct_hashes: + usecret = to_unicode(secret) + bsecret = to_bytes(secret) + self.assertEqual(handler.decode(hash), usecret) + self.assertEqual(handler.decode(hash, None), bsecret) + + self.assertRaises(UnicodeDecodeError, handler.decode, + '0958EDC8A9F495F6F8A5FD', 'ascii') + + def test_91_salt(self): + "test salt value border cases" + handler = self.handler + self.assertRaises(TypeError, handler, salt=None) + handler(salt=None, use_defaults=True) + self.assertRaises(TypeError, handler, salt='abc') + self.assertRaises(ValueError, handler, salt=-10) + with catch_warnings(record=True) as wlog: + h = handler(salt=100, relaxed=True) + self.consumeWarningList(wlog, ["salt/offset must be.*"]) + self.assertEqual(h.salt, 52) + #========================================================= # crypt16 #========================================================= @@ -603,6 +651,10 @@ class _des_crypt_test(HandlerCase): # bad char in otherwise correctly formatted hash #\/ '!gAwTx2l6NADI', + + # wrong size + 'OgAwTx2l6NAD', + 'OgAwTx2l6NADIj', ] platform_crypt_support = dict( @@ -627,18 +679,15 @@ class _DjangoHelper(object): # NOTE: not testing against Django < 1.0 since it doesn't support # most of these hash formats. - def get_fuzz_verifiers(self): - verifiers = super(_DjangoHelper, self).get_fuzz_verifiers() - + def fuzz_verifier_django(self): from passlib.tests.test_ext_django import has_django1 - if has_django1: - from django.contrib.auth.models import check_password - def verify_django(secret, hash): - "django check_password()" - return check_password(secret, hash) - verifiers.append(verify_django) - - return verifiers + if not has_django1: + return None + from django.contrib.auth.models import check_password + def verify_django(secret, hash): + "django check_password()" + return check_password(secret, hash) + return verify_django def test_90_django_reference(self): "run known correct hashes through Django's check_password()" @@ -794,6 +843,9 @@ class fshp_test(HandlerCase): ] known_malformed_hashes = [ + # bad base64 padding + '{FSHP0|0|1}qUqP5cyxm6YcTAhz05Hph5gvu9M', + # wrong salt size '{FSHP0|1|1}qUqP5cyxm6YcTAhz05Hph5gvu9M=', @@ -801,6 +853,32 @@ class fshp_test(HandlerCase): '{FSHP0|0|A}qUqP5cyxm6YcTAhz05Hph5gvu9M=', ] + def test_90_variant(self): + "test variant keyword" + handler = self.handler + kwds = dict(salt=b('a'), rounds=1) + + # accepts ints + handler(variant=1, **kwds) + + # accepts bytes or unicode + handler(variant=u('1'), **kwds) + handler(variant=b('1'), **kwds) + + # aliases + handler(variant=u('sha256'), **kwds) + handler(variant=b('sha256'), **kwds) + + # rejects None + self.assertRaises(TypeError, handler, variant=None, **kwds) + + # rejects other types + self.assertRaises(TypeError, handler, variant=complex(1,1), **kwds) + + # invalid variant + self.assertRaises(ValueError, handler, variant='9', **kwds) + self.assertRaises(ValueError, handler, variant=9, **kwds) + #========================================================= #hex digests #========================================================= @@ -844,6 +922,37 @@ class hex_sha512_test(HandlerCase): ] #========================================================= +# htdigest hash +#========================================================= +class htdigest_test(UserHandlerMixin, HandlerCase): + handler = hash.htdigest + + known_correct_hashes = [ + # secret, user, realm + + # from RFC 2617 + (("Circle Of Life", "Mufasa", "testrealm@host.com"), + '939e7578ed9e3c518a452acee763bce9'), + + # custom + ((UPASS_TABLE, UPASS_USD, UPASS_WAV), + '4dabed2727d583178777fab468dd1f17'), + ] + + def test_80_user(self): + raise self.skipTest("test case doesn't support 'realm' keyword") + + def _insert_user(self, kwds, secret): + "insert username into kwds" + if isinstance(secret, tuple): + secret, user, realm = secret + else: + user, realm = "user", "realm" + kwds.setdefault("user", user) + kwds.setdefault("realm", realm) + return secret + +#========================================================= #ldap hashes #========================================================= class ldap_md5_test(HandlerCase): @@ -1080,6 +1189,9 @@ class _md5_crypt_test(HandlerCase): known_malformed_hashes = [ # bad char in otherwise correct hash \/ '$1$dOHYPKoP$tnxS1T8Q6VVn3kpV8cN6o!', + + # too many fields + '$1$dOHYPKoP$tnxS1T8Q6VVn3kpV8cN6o.$', ] platform_crypt_support = dict( @@ -1789,6 +1901,13 @@ class scram_test(HandlerCase): # bad char in digest ---\/ '$scram$4096$QSXCR.Q6sek8bf92$sha-1=HZbuOlKbWl.eR8AfIposuKbhX3-', + # missing sections + '$scram$4096$QSXCR.Q6sek8bf92', + '$scram$4096$QSXCR.Q6sek8bf92$', + + # too many sections + '$scram$4096$QSXCR.Q6sek8bf92$sha-1=HZbuOlKbWl.eR8AfIposuKbhX30$', + # missing separator '$scram$4096$QSXCR.Q6sek8bf92$sha-1=HZbuOlKbWl.eR8AfIposuKbhX30' 'sha-256=qXUXrlcvnaxxWG00DdRgVioR2gnUpuX5r.3EZ1rdhVY', @@ -1800,11 +1919,17 @@ class scram_test(HandlerCase): # missing sha-1 alg '$scram$4096$QSXCR.Q6sek8bf92$sha-256=HZbuOlKbWl.eR8AfIposuKbhX30', + # non-iana name + '$scram$4096$QSXCR.Q6sek8bf92$sha1=HZbuOlKbWl.eR8AfIposuKbhX30', ] - # silence norm_hash_name() warning def setUp(self): super(scram_test, self).setUp() + + # some platforms lack stringprep (e.g. Jython, IronPython) + self.require_stringprep() + + # silence norm_hash_name() warning warnings.filterwarnings("ignore", r"norm_hash_name\(\): unknown hash") def test_90_algs(self): @@ -1858,6 +1983,10 @@ class scram_test(HandlerCase): 'sha-1=HZbuOlKbWl.eR8AfIposuKbhX30'), ["sha-1"]) self.assertEqual(eda('$scram$4096$QSXCR.Q6sek8bf92$' + 'sha-1=HZbuOlKbWl.eR8AfIposuKbhX30', format="hashlib"), + ["sha1"]) + + self.assertEqual(eda('$scram$4096$QSXCR.Q6sek8bf92$' 'sha-1=HZbuOlKbWl.eR8AfIposuKbhX30,' 'sha-256=qXUXrlcvnaxxWG00DdRgVioR2gnUpuX5r.3EZ1rdhVY,' 'sha-512=lzgniLFcvglRLS0gt.C4gy.NurS3OIOVRAU1zZOV4P.qFiVFO2/' @@ -1887,6 +2016,9 @@ class scram_test(HandlerCase): # check rounds self.assertRaises(ValueError, hash, "IX", s1, 0, 'sha-1') + # bad types + self.assertRaises(TypeError, hash, "IX", u('\x01'), 1000, 'md5') + def test_94_saslprep(self): "test encrypt/verify use saslprep" # NOTE: this just does a light test that saslprep() is being @@ -1917,14 +2049,16 @@ class scram_test(HandlerCase): self.assertEqual(handler.extract_digest_algs(h), ["md5", "sha-1"]) self.assertFalse(c1.hash_needs_update(h)) - c2 = c1.replace(scram__algs="sha1") + c2 = c1.copy(scram__algs="sha1") self.assertFalse(c2.hash_needs_update(h)) - c2 = c1.replace(scram__algs="sha1,sha256") + c2 = c1.copy(scram__algs="sha1,sha256") self.assertTrue(c2.hash_needs_update(h)) def test_96_full_verify(self): "test verify(full=True) flag" + def vpart(s, h): + return self.handler.verify(s, h) def vfull(s, h): return self.handler.verify(s, h, full=True) @@ -1953,12 +2087,16 @@ class scram_test(HandlerCase): 'edGQSu/kD1LwdX0SNV/KsPdHSwEl5qRTuZQ') self.assertRaises(ValueError, vfull, 'pencil', h) - # catch digests belonging to diff passwords. + # catch hash containing digests belonging to diff passwords. + # proper behavior for quick-verify (the default) is undefined, + # but full-verify should throw error. h = ('$scram$4096$QSXCR.Q6sek8bf92$' - 'sha-1=HZbuOlKbWl.eR8AfIposuKbhX30,' - 'sha-256=R7RJDWIbeKRTFwhE9oxh04kab0CllrQ3kCcpZUcligc' # 'tape' - 'sha-512=lzgniLFcvglRLS0gt.C4gy.NurS3OIOVRAU1zZOV4P.qFiVFO2/' + 'sha-1=HZbuOlKbWl.eR8AfIposuKbhX30,' # 'pencil' + 'sha-256=R7RJDWIbeKRTFwhE9oxh04kab0CllrQ3kCcpZUcligc,' # 'tape' + 'sha-512=lzgniLFcvglRLS0gt.C4gy.NurS3OIOVRAU1zZOV4P.qFiVFO2/' # 'pencil' 'edGQSu/kD1LwdX0SNV/KsPdHSwEl5qRTuZQ') + self.assertTrue(vpart('tape', h)) + self.assertFalse(vpart('pencil', h)) self.assertRaises(ValueError, vfull, 'pencil', h) self.assertRaises(ValueError, vfull, 'tape', h) @@ -1983,6 +2121,12 @@ class _sha1_crypt_test(HandlerCase): # zero padded rounds '$sha1$01773$uV7PTeux$I9oHnvwPZHMO0Nq6/WgyGV/tDJIH', + + # too many fields + '$sha1$21773$uV7PTeux$I9oHnvwPZHMO0Nq6/WgyGV/tDJIH$', + + # empty rounds field + '$sha1$$uV7PTeux$I9oHnvwPZHMO0Nq6/WgyGV/tDJIH$', ] platform_crypt_support = dict( @@ -2294,6 +2438,14 @@ class sun_md5_crypt_test(HandlerCase): ] known_malformed_hashes = [ + # unexpected end of hash + "$md5,rounds=5000", + + # bad rounds + "$md5,rounds=500A$xxxx", + "$md5,rounds=0500$xxxx", + "$md5,rounds=0$xxxx", + # bad char in otherwise correct hash "$md5$RPgL!6IJ$WTvAlUJ7MqH5xak2FMEwS/", @@ -2347,6 +2499,16 @@ class unix_disabled_test(HandlerCase): # TODO: test custom marker support # TODO: test default marker selection + def test_90_preserves_existing(self): + "test preserves existing disabled hash" + handler = self.handler + + # use marker if no hash + self.assertEqual(handler.genhash("stub", None), handler.marker) + + # use hash if provided and valid + self.assertEqual(handler.genhash("stub", "!asd"), "!asd") + class unix_fallback_test(HandlerCase): handler = hash.unix_fallback accepts_all_hashes = True diff --git a/passlib/tests/test_hosts.py b/passlib/tests/test_hosts.py index de744a8..a64fb30 100644 --- a/passlib/tests/test_hosts.py +++ b/passlib/tests/test_hosts.py @@ -72,7 +72,7 @@ class HostsTest(TestCase): # validate schemes is non-empty, # and contains unix_disabled + at least one real scheme - schemes = ctx.policy.schemes() + schemes = list(ctx.schemes()) self.assertTrue(schemes, "appears to be unix system, but no known schemes supported by crypt") self.assertTrue('unix_disabled' in schemes) schemes.remove("unix_disabled") diff --git a/passlib/tests/test_registry.py b/passlib/tests/test_registry.py index 1990919..3f18271 100644 --- a/passlib/tests/test_registry.py +++ b/passlib/tests/test_registry.py @@ -126,6 +126,8 @@ class RegistryTest(TestCase): self.assertRaises(ValueError, register_crypt_handler, type('x', (uh.StaticHandler,), dict(name=None))) self.assertRaises(ValueError, register_crypt_handler, type('x', (uh.StaticHandler,), dict(name="AB_CD"))) self.assertRaises(ValueError, register_crypt_handler, type('x', (uh.StaticHandler,), dict(name="ab-cd"))) + self.assertRaises(ValueError, register_crypt_handler, type('x', (uh.StaticHandler,), dict(name="ab__cd"))) + self.assertRaises(ValueError, register_crypt_handler, type('x', (uh.StaticHandler,), dict(name="default"))) class dummy_1(uh.StaticHandler): name = "dummy_1" diff --git a/passlib/tests/test_utils.py b/passlib/tests/test_utils.py index 678ae06..d317629 100644 --- a/passlib/tests/test_utils.py +++ b/passlib/tests/test_utils.py @@ -26,6 +26,60 @@ class MiscTest(TestCase): #NOTE: could test xor_bytes(), but it's exercised well enough by pbkdf2 test + def test_classproperty(self): + from passlib.utils import classproperty + + class test(object): + xvar = 1 + @classproperty + def xprop(cls): + return cls.xvar + + self.assertEqual(test.xprop, 1) + prop = test.__dict__['xprop'] + self.assertIs(prop.im_func, prop.__func__) + + def test_deprecated_function(self): + from passlib.utils import deprecated_function + # NOTE: not comprehensive, just tests the basic behavior + + @deprecated_function(deprecated="1.6", removed="1.8") + def test_func(*args): + "test docstring" + return args + + self.assertTrue(".. deprecated::" in test_func.__doc__) + + with catch_warnings(record=True) as wlog: + self.assertEqual(test_func(1,2), (1,2)) + self.consumeWarningList(wlog,[ + dict(category=DeprecationWarning, + message="the function passlib.tests.test_utils.test_func() " + "is deprecated as of Passlib 1.6, and will be " + "removed in Passlib 1.8." + ), + ]) + + def test_memoized_property(self): + from passlib.utils import memoized_property + + class dummy(object): + counter = 0 + + @memoized_property + def value(self): + value = self.counter + self.counter = value+1 + return value + + d = dummy() + self.assertEqual(d.value, 0) + self.assertEqual(d.value, 0) + self.assertEqual(d.counter, 1) + + prop = dummy.value + self.assertIs(prop.im_func, prop.__func__) + def test_getrandbytes(self): "test getrandbytes()" from passlib.utils import getrandbytes, rng @@ -241,13 +295,11 @@ class MiscTest(TestCase): def test_saslprep(self): "test saslprep() unicode normalizer" - from passlib.utils import saslprep as sp, _ipy_missing_stringprep - + self.require_stringprep() + from passlib.utils.compat import IRONPYTHON if IRONPYTHON: - self.assertTrue(_ipy_missing_stringprep, - "alert passlib author that IPY has stringprep support!") - self.assertRaises(NotImplementedError, sp, u('abc')) - raise self.skipTest("stringprep missing under IPY") + warn("IronPython now has stringprep support!") + from passlib.utils import saslprep as sp # invalid types self.assertRaises(TypeError, sp, None) @@ -290,6 +342,8 @@ class MiscTest(TestCase): # unassigned code points (as of unicode 3.2) self.assertRaises(ValueError, sp, u("\u0900")) self.assertRaises(ValueError, sp, u("\uFFF8")) + # tagging characters + self.assertRaises(ValueError, sp, u("\U000e0001")) # verify bidi behavior # if starts with R/AL -- must end with R/AL @@ -450,86 +504,32 @@ class CodecTest(TestCase): self.assertFalse(is_same_codec("ascii", "utf-8")) #========================================================= -#test des module +# base64engine #========================================================= -import passlib.utils.des as des - -class DesTest(TestCase): - - #test vectors taken from http://www.skepticfiles.org/faq/testdes.htm - - #data is list of (key, plaintext, ciphertext), all as 64 bit hex string - test_des_vectors = [ - (line[4:20], line[21:37], line[38:54]) - for line in -b(""" 0000000000000000 0000000000000000 8CA64DE9C1B123A7 - FFFFFFFFFFFFFFFF FFFFFFFFFFFFFFFF 7359B2163E4EDC58 - 3000000000000000 1000000000000001 958E6E627A05557B - 1111111111111111 1111111111111111 F40379AB9E0EC533 - 0123456789ABCDEF 1111111111111111 17668DFC7292532D - 1111111111111111 0123456789ABCDEF 8A5AE1F81AB8F2DD - 0000000000000000 0000000000000000 8CA64DE9C1B123A7 - FEDCBA9876543210 0123456789ABCDEF ED39D950FA74BCC4 - 7CA110454A1A6E57 01A1D6D039776742 690F5B0D9A26939B - 0131D9619DC1376E 5CD54CA83DEF57DA 7A389D10354BD271 - 07A1133E4A0B2686 0248D43806F67172 868EBB51CAB4599A - 3849674C2602319E 51454B582DDF440A 7178876E01F19B2A - 04B915BA43FEB5B6 42FD443059577FA2 AF37FB421F8C4095 - 0113B970FD34F2CE 059B5E0851CF143A 86A560F10EC6D85B - 0170F175468FB5E6 0756D8E0774761D2 0CD3DA020021DC09 - 43297FAD38E373FE 762514B829BF486A EA676B2CB7DB2B7A - 07A7137045DA2A16 3BDD119049372802 DFD64A815CAF1A0F - 04689104C2FD3B2F 26955F6835AF609A 5C513C9C4886C088 - 37D06BB516CB7546 164D5E404F275232 0A2AEEAE3FF4AB77 - 1F08260D1AC2465E 6B056E18759F5CCA EF1BF03E5DFA575A - 584023641ABA6176 004BD6EF09176062 88BF0DB6D70DEE56 - 025816164629B007 480D39006EE762F2 A1F9915541020B56 - 49793EBC79B3258F 437540C8698F3CFA 6FBF1CAFCFFD0556 - 4FB05E1515AB73A7 072D43A077075292 2F22E49BAB7CA1AC - 49E95D6D4CA229BF 02FE55778117F12A 5A6B612CC26CCE4A - 018310DC409B26D6 1D9D5C5018F728C2 5F4C038ED12B2E41 - 1C587F1C13924FEF 305532286D6F295A 63FAC0D034D9F793 - 0101010101010101 0123456789ABCDEF 617B3A0CE8F07100 - 1F1F1F1F0E0E0E0E 0123456789ABCDEF DB958605F8C8C606 - E0FEE0FEF1FEF1FE 0123456789ABCDEF EDBFD1C66C29CCC7 - 0000000000000000 FFFFFFFFFFFFFFFF 355550B2150E2451 - FFFFFFFFFFFFFFFF 0000000000000000 CAAAAF4DEAF1DBAE - 0123456789ABCDEF 0000000000000000 D5D44FF720683D0D - FEDCBA9876543210 FFFFFFFFFFFFFFFF 2A2BB008DF97C2F2 - """).split(b("\n")) if line.strip() - ] +class Base64EngineTest(TestCase): + "test standalone parts of Base64Engine" + # NOTE: most Base64Engine testing done via _Base64Test subclasses below. - def test_des_encrypt_block(self): - for k,p,c in self.test_des_vectors: - k = unhexlify(k) - p = unhexlify(p) - c = unhexlify(c) - result = des.des_encrypt_block(k,p) - self.assertEqual(result, c, "key=%r p=%r:" % (k,p)) - - #test 7 byte key - #FIXME: use a better key - k,p,c = b('00000000000000'), b('FFFFFFFFFFFFFFFF'), b('355550B2150E2451') - k = unhexlify(k) - p = unhexlify(p) - c = unhexlify(c) - result = des.des_encrypt_block(k,p) - self.assertEqual(result, c, "key=%r p=%r:" % (k,p)) - - def test_mdes_encrypt_int_block(self): - for k,p,c in self.test_des_vectors: - k = int(k,16) - p = int(p,16) - c = int(c,16) - result = des.mdes_encrypt_int_block(k,p, salt=0, rounds=1) - self.assertEqual(result, c, "key=%r p=%r:" % (k,p)) - - #TODO: test other des methods (eg: mdes_encrypt_int_block w/ salt & rounds) - # though des-crypt builtin backend test should thump it well enough + def test_constructor(self): + from passlib.utils import Base64Engine, AB64_CHARS + + # bad charmap type + self.assertRaises(TypeError, Base64Engine, 1) + + # bad charmap size + self.assertRaises(ValueError, Base64Engine, AB64_CHARS[:-1]) + + # dup charmap letter + self.assertRaises(ValueError, Base64Engine, AB64_CHARS[:-1] + "A") + + def test_ab64(self): + from passlib.utils import ab64_decode + # TODO: make ab64_decode (and a b64 variant) *much* stricter about + # padding chars, etc. + + # 1 mod 4 not valid + self.assertRaises(ValueError, ab64_decode, "abcde") -#========================================================= -# base64engine -#========================================================= class _Base64Test(TestCase): "common tests for all Base64Engine instances" #========================================================= @@ -730,6 +730,8 @@ class _Base64Test(TestCase): out = engine.decode_bytes(tmp) self.assertEqual(out, result) + self.assertRaises(TypeError, engine.encode_transposed_bytes, u("a"), []) + def test_decode_transposed_bytes(self): "test decode_transposed_bytes()" engine = self.engine @@ -895,351 +897,5 @@ class H64Big_Test(_Base64Test): ] #========================================================= -#test md4 -#========================================================= -class _MD4_Test(TestCase): - #test vectors from http://www.faqs.org/rfcs/rfc1320.html - A.5 - - hash = None - - vectors = [ - # input -> hex digest - (b(""), "31d6cfe0d16ae931b73c59d7e0c089c0"), - (b("a"), "bde52cb31de33e46245e05fbdbd6fb24"), - (b("abc"), "a448017aaf21d8525fc10ae87aa6729d"), - (b("message digest"), "d9130a8164549fe818874806e1c7014b"), - (b("abcdefghijklmnopqrstuvwxyz"), "d79e1c308aa5bbcdeea8ed63df412da9"), - (b("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"), "043f8582f241db351ce627e153e7f0e4"), - (b("12345678901234567890123456789012345678901234567890123456789012345678901234567890"), "e33b4ddc9c38f2199c3e7b164fcc0536"), - ] - - def test_md4_update(self): - "test md4 update" - md4 = self.hash - h = md4(b('')) - self.assertEqual(h.hexdigest(), "31d6cfe0d16ae931b73c59d7e0c089c0") - - #NOTE: under py2, hashlib methods try to encode to ascii, - # though shouldn't rely on that. - if PY3: - self.assertRaises(TypeError, h.update, u('x')) - - h.update(b('a')) - self.assertEqual(h.hexdigest(), "bde52cb31de33e46245e05fbdbd6fb24") - - h.update(b('bcdefghijklmnopqrstuvwxyz')) - self.assertEqual(h.hexdigest(), "d79e1c308aa5bbcdeea8ed63df412da9") - - def test_md4_hexdigest(self): - "test md4 hexdigest()" - md4 = self.hash - for input, hex in self.vectors: - out = md4(input).hexdigest() - self.assertEqual(out, hex) - - def test_md4_digest(self): - "test md4 digest()" - md4 = self.hash - for input, hex in self.vectors: - out = bascii_to_str(hexlify(md4(input).digest())) - self.assertEqual(out, hex) - - def test_md4_copy(self): - "test md4 copy()" - md4 = self.hash - h = md4(b('abc')) - - h2 = h.copy() - h2.update(b('def')) - self.assertEqual(h2.hexdigest(), '804e7f1c2586e50b49ac65db5b645131') - - h.update(b('ghi')) - self.assertEqual(h.hexdigest(), 'c5225580bfe176f6deeee33dee98732c') - -# -#now do a bunch of things to test multiple possible backends. -# -import passlib.utils.md4 as md4_mod - -has_ssl_md4 = (md4_mod.md4 is not md4_mod._builtin_md4) - -if has_ssl_md4: - class MD4_SSL_Test(_MD4_Test): - descriptionPrefix = "MD4 (SSL version)" - hash = staticmethod(md4_mod.md4) - -if not has_ssl_md4 or enable_option("cover"): - class MD4_Builtin_Test(_MD4_Test): - descriptionPrefix = "MD4 (builtin version)" - hash = md4_mod._builtin_md4 - -#========================================================= -#test passlib.utils.pbkdf2 -#========================================================= -import hashlib -import hmac -from passlib.utils import pbkdf2 - -#TODO: should we bother testing hmac_sha1() function? it's verified via sha1_crypt testing. -class CryptoTest(TestCase): - "test various crypto functions" - - ndn_formats = ["hashlib", "iana"] - ndn_values = [ - # (iana name, hashlib name, ... other unnormalized names) - ("md5", "md5", "SCRAM-MD5-PLUS", "MD-5"), - ("sha1", "sha-1", "SCRAM-SHA-1", "SHA1"), - ("sha256", "sha-256", "SHA_256", "sha2-256"), - ("ripemd", "ripemd", "SCRAM-RIPEMD", "RIPEMD"), - ("ripemd160", "ripemd-160", - "SCRAM-RIPEMD-160", "RIPEmd160"), - ("test128", "test-128", "TEST128"), - ("test2", "test2", "TEST-2"), - ("test3128", "test3-128", "TEST-3-128"), - ] - - def test_norm_hash_name(self): - "test norm_hash_name()" - from itertools import chain - from passlib.utils.pbkdf2 import norm_hash_name, _nhn_hash_names - - # test formats - for format in self.ndn_formats: - norm_hash_name("md4", format) - self.assertRaises(ValueError, norm_hash_name, "md4", None) - self.assertRaises(ValueError, norm_hash_name, "md4", "fake") - - # test types - self.assertEqual(norm_hash_name(u("MD4")), "md4") - self.assertEqual(norm_hash_name(b("MD4")), "md4") - self.assertRaises(TypeError, norm_hash_name, None) - - # test selected results - with catch_warnings(): - warnings.filterwarnings("ignore", '.*unknown hash') - for row in chain(_nhn_hash_names, self.ndn_values): - for idx, format in enumerate(self.ndn_formats): - correct = row[idx] - for value in row: - result = norm_hash_name(value, format) - self.assertEqual(result, correct, - "name=%r, format=%r:" % (value, - format)) - -class KdfTest(TestCase): - "test kdf helpers" - - def test_pbkdf1(self): - "test pbkdf1" - for secret, salt, rounds, klen, hash, correct in [ - #http://www.di-mgt.com.au/cryptoKDFs.html - (b('password'), hb('78578E5A5D63CB06'), 1000, 16, 'sha1', - hb('dc19847e05c64d2faf10ebfb4a3d2a20')), - ]: - result = pbkdf2.pbkdf1(secret, salt, rounds, klen, hash) - self.assertEqual(result, correct) - - #test rounds < 1 - #test klen < 0 - #test klen > block size - #test invalid hash - -#NOTE: this is not run directly, but via two subclasses (below) -class _Pbkdf2BackendTest(TestCase): - "test builtin unix crypt backend" - enable_m2crypto = False - - def setUp(self): - #disable m2crypto support so we'll always use software backend - if not self.enable_m2crypto: - self._orig_EVP = pbkdf2._EVP - pbkdf2._EVP = None - else: - #set flag so tests can check for m2crypto presence quickly - self.enable_m2crypto = bool(pbkdf2._EVP) - pbkdf2._clear_prf_cache() - - def tearDown(self): - if not self.enable_m2crypto: - pbkdf2._EVP = self._orig_EVP - pbkdf2._clear_prf_cache() - - #TODO: test get_prf() behavior in various situations - though overall behavior tested via pbkdf2 - - def test_rfc3962(self): - "rfc3962 test vectors" - self.assertFunctionResults(pbkdf2.pbkdf2, [ - # result, secret, salt, rounds, keylen, digest="sha1" - - #test case 1 / 128 bit - ( - hb("cdedb5281bb2f801565a1122b2563515"), - b("password"), b("ATHENA.MIT.EDUraeburn"), 1, 16 - ), - - #test case 2 / 128 bit - ( - hb("01dbee7f4a9e243e988b62c73cda935d"), - b("password"), b("ATHENA.MIT.EDUraeburn"), 2, 16 - ), - - #test case 2 / 256 bit - ( - hb("01dbee7f4a9e243e988b62c73cda935da05378b93244ec8f48a99e61ad799d86"), - b("password"), b("ATHENA.MIT.EDUraeburn"), 2, 32 - ), - - #test case 3 / 256 bit - ( - hb("5c08eb61fdf71e4e4ec3cf6ba1f5512ba7e52ddbc5e5142f708a31e2e62b1e13"), - b("password"), b("ATHENA.MIT.EDUraeburn"), 1200, 32 - ), - - #test case 4 / 256 bit - ( - hb("d1daa78615f287e6a1c8b120d7062a493f98d203e6be49a6adf4fa574b6e64ee"), - b("password"), b('\x12\x34\x56\x78\x78\x56\x34\x12'), 5, 32 - ), - - #test case 5 / 256 bit - ( - hb("139c30c0966bc32ba55fdbf212530ac9c5ec59f1a452f5cc9ad940fea0598ed1"), - b("X"*64), b("pass phrase equals block size"), 1200, 32 - ), - - #test case 6 / 256 bit - ( - hb("9ccad6d468770cd51b10e6a68721be611a8b4d282601db3b36be9246915ec82a"), - b("X"*65), b("pass phrase exceeds block size"), 1200, 32 - ), - ]) - - def test_rfc6070(self): - "rfc6070 test vectors" - self.assertFunctionResults(pbkdf2.pbkdf2, [ - - ( - hb("0c60c80f961f0e71f3a9b524af6012062fe037a6"), - b("password"), b("salt"), 1, 20, - ), - - ( - hb("ea6c014dc72d6f8ccd1ed92ace1d41f0d8de8957"), - b("password"), b("salt"), 2, 20, - ), - - ( - hb("4b007901b765489abead49d926f721d065a429c1"), - b("password"), b("salt"), 4096, 20, - ), - - #just runs too long - could enable if ALL option is set - ##( - ## - ## unhexlify("eefe3d61cd4da4e4e9945b3d6ba2158c2634e984"), - ## "password", "salt", 16777216, 20, - ##), - - ( - hb("3d2eec4fe41c849b80c8d83662c0e44a8b291a964cf2f07038"), - b("passwordPASSWORDpassword"), - b("saltSALTsaltSALTsaltSALTsaltSALTsalt"), - 4096, 25, - ), - - ( - hb("56fa6aa75548099dcc37d7f03425e0c3"), - b("pass\00word"), b("sa\00lt"), 4096, 16, - ), - ]) - - def test_invalid_values(self): - - #invalid rounds - self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), -1, 16) - self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 0, 16) - self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), b('salt'), 'x', 16) - - #invalid keylen - self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), - 1, 20*(2**32-1)+1) - - #invalid salt type - self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), 5, 1, 10) - - #invalid secret type - self.assertRaises(TypeError, pbkdf2.pbkdf2, 5, b('salt'), 1, 10) - - #invalid hash - self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 'hmac-foo') - self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 'foo') - self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 5) - - def test_default_keylen(self): - "test keylen==-1" - self.assertEqual(len(pbkdf2.pbkdf2(b('password'), b('salt'), 1, -1, - prf='hmac-sha1')), 20) - - self.assertEqual(len(pbkdf2.pbkdf2(b('password'), b('salt'), 1, -1, - prf='hmac-sha256')), 32) - - def test_hmac_sha1(self): - "test independant hmac_sha1() method" - self.assertEqual( - pbkdf2.hmac_sha1(b("secret"), b("salt")), - b('\xfc\xd4\x0c;]\r\x97\xc6\xf1S\x8d\x93\xb9\xeb\xc6\x00\x04.\x8b\xfe') - ) - - def test_sha1_string(self): - "test various prf values" - self.assertEqual( - pbkdf2.pbkdf2(b("secret"), b("salt"), 10, 16, "hmac-sha1"), - b('\xe2H\xfbk\x136QF\xf8\xacc\x07\xcc"(\x12') - ) - - def test_sha512_string(self): - "test alternate digest string (sha512)" - self.assertFunctionResults(pbkdf2.pbkdf2, [ - # result, secret, salt, rounds, keylen, digest="sha1" - - #case taken from example in http://grub.enbug.org/Authentication - ( - hb("887CFF169EA8335235D8004242AA7D6187A41E3187DF0CE14E256D85ED97A97357AAA8FF0A3871AB9EEFF458392F462F495487387F685B7472FC6C29E293F0A0"), - b("hello"), - hb("9290F727ED06C38BA4549EF7DE25CF5642659211B7FC076F2D28FEFD71784BB8D8F6FB244A8CC5C06240631B97008565A120764C0EE9C2CB0073994D79080136"), - 10000, 64, "hmac-sha512" - ), - ]) - - def test_sha512_function(self): - "test custom digest function" - def prf(key, msg): - return hmac.new(key, msg, hashlib.sha512).digest() - - self.assertFunctionResults(pbkdf2.pbkdf2, [ - # result, secret, salt, rounds, keylen, digest="sha1" - - #case taken from example in http://grub.enbug.org/Authentication - ( - hb("887CFF169EA8335235D8004242AA7D6187A41E3187DF0CE14E256D85ED97A97357AAA8FF0A3871AB9EEFF458392F462F495487387F685B7472FC6C29E293F0A0"), - b("hello"), - hb("9290F727ED06C38BA4549EF7DE25CF5642659211B7FC076F2D28FEFD71784BB8D8F6FB244A8CC5C06240631B97008565A120764C0EE9C2CB0073994D79080136"), - 10000, 64, prf, - ), - ]) - -has_m2crypto = (pbkdf2._EVP is not None) - -if has_m2crypto: - class Pbkdf2_M2Crypto_Test(_Pbkdf2BackendTest): - descriptionPrefix = "pbkdf2 (m2crypto backend)" - enable_m2crypto = True - -if not has_m2crypto or enable_option("cover"): - class Pbkdf2_Builtin_Test(_Pbkdf2BackendTest): - descriptionPrefix = "pbkdf2 (builtin backend)" - enable_m2crypto = False - -#========================================================= #EOF #========================================================= diff --git a/passlib/tests/test_utils_crypto.py b/passlib/tests/test_utils_crypto.py new file mode 100644 index 0000000..94c20e8 --- /dev/null +++ b/passlib/tests/test_utils_crypto.py @@ -0,0 +1,550 @@ +"""tests for passlib.utils.(des|pbkdf2|md4)""" +#========================================================= +#imports +#========================================================= +from __future__ import with_statement +#core +from binascii import hexlify, unhexlify +import sys +import random +import warnings +#site +#pkg +#module +from passlib.utils.compat import b, bytes, bascii_to_str, irange, PY2, PY3, u, \ + unicode, join_bytes +from passlib.tests.utils import TestCase, Params as ak, enable_option, catch_warnings + +#========================================================= +# support +#========================================================= +def hb(source): + return unhexlify(b(source)) + +#========================================================= +#test des module +#========================================================= +class DesTest(TestCase): + + # test vectors taken from http://www.skepticfiles.org/faq/testdes.htm + des_test_vectors = [ + # key, plaintext, ciphertext + (0x0000000000000000, 0x0000000000000000, 0x8CA64DE9C1B123A7), + (0xFFFFFFFFFFFFFFFF, 0xFFFFFFFFFFFFFFFF, 0x7359B2163E4EDC58), + (0x3000000000000000, 0x1000000000000001, 0x958E6E627A05557B), + (0x1111111111111111, 0x1111111111111111, 0xF40379AB9E0EC533), + (0x0123456789ABCDEF, 0x1111111111111111, 0x17668DFC7292532D), + (0x1111111111111111, 0x0123456789ABCDEF, 0x8A5AE1F81AB8F2DD), + (0x0000000000000000, 0x0000000000000000, 0x8CA64DE9C1B123A7), + (0xFEDCBA9876543210, 0x0123456789ABCDEF, 0xED39D950FA74BCC4), + (0x7CA110454A1A6E57, 0x01A1D6D039776742, 0x690F5B0D9A26939B), + (0x0131D9619DC1376E, 0x5CD54CA83DEF57DA, 0x7A389D10354BD271), + (0x07A1133E4A0B2686, 0x0248D43806F67172, 0x868EBB51CAB4599A), + (0x3849674C2602319E, 0x51454B582DDF440A, 0x7178876E01F19B2A), + (0x04B915BA43FEB5B6, 0x42FD443059577FA2, 0xAF37FB421F8C4095), + (0x0113B970FD34F2CE, 0x059B5E0851CF143A, 0x86A560F10EC6D85B), + (0x0170F175468FB5E6, 0x0756D8E0774761D2, 0x0CD3DA020021DC09), + (0x43297FAD38E373FE, 0x762514B829BF486A, 0xEA676B2CB7DB2B7A), + (0x07A7137045DA2A16, 0x3BDD119049372802, 0xDFD64A815CAF1A0F), + (0x04689104C2FD3B2F, 0x26955F6835AF609A, 0x5C513C9C4886C088), + (0x37D06BB516CB7546, 0x164D5E404F275232, 0x0A2AEEAE3FF4AB77), + (0x1F08260D1AC2465E, 0x6B056E18759F5CCA, 0xEF1BF03E5DFA575A), + (0x584023641ABA6176, 0x004BD6EF09176062, 0x88BF0DB6D70DEE56), + (0x025816164629B007, 0x480D39006EE762F2, 0xA1F9915541020B56), + (0x49793EBC79B3258F, 0x437540C8698F3CFA, 0x6FBF1CAFCFFD0556), + (0x4FB05E1515AB73A7, 0x072D43A077075292, 0x2F22E49BAB7CA1AC), + (0x49E95D6D4CA229BF, 0x02FE55778117F12A, 0x5A6B612CC26CCE4A), + (0x018310DC409B26D6, 0x1D9D5C5018F728C2, 0x5F4C038ED12B2E41), + (0x1C587F1C13924FEF, 0x305532286D6F295A, 0x63FAC0D034D9F793), + (0x0101010101010101, 0x0123456789ABCDEF, 0x617B3A0CE8F07100), + (0x1F1F1F1F0E0E0E0E, 0x0123456789ABCDEF, 0xDB958605F8C8C606), + (0xE0FEE0FEF1FEF1FE, 0x0123456789ABCDEF, 0xEDBFD1C66C29CCC7), + (0x0000000000000000, 0xFFFFFFFFFFFFFFFF, 0x355550B2150E2451), + (0xFFFFFFFFFFFFFFFF, 0x0000000000000000, 0xCAAAAF4DEAF1DBAE), + (0x0123456789ABCDEF, 0x0000000000000000, 0xD5D44FF720683D0D), + (0xFEDCBA9876543210, 0xFFFFFFFFFFFFFFFF, 0x2A2BB008DF97C2F2), + ] + + def test_01_expand(self): + "test expand_des_key()" + from passlib.utils.des import expand_des_key, shrink_des_key, \ + _KDATA_MASK, INT_56_MASK + + # make sure test vectors are preserved (sans parity bits) + # uses ints, bytes are tested under #02 + for key1, _, _ in self.des_test_vectors: + key2 = shrink_des_key(key1) + key3 = expand_des_key(key2) + # NOTE: this assumes expand_des_key() sets parity bits to 0 + self.assertEqual(key3, key1 & _KDATA_MASK) + + # type checks + self.assertRaises(TypeError, expand_des_key, 1.0) + + # too large + self.assertRaises(ValueError, expand_des_key, INT_56_MASK+1) + self.assertRaises(ValueError, expand_des_key, b("\x00")*8) + + # too small + self.assertRaises(ValueError, expand_des_key, -1) + self.assertRaises(ValueError, expand_des_key, b("\x00")*6) + + def test_02_shrink(self): + "test shrink_des_key()" + from passlib.utils.des import expand_des_key, shrink_des_key, \ + INT_64_MASK + from passlib.utils import random, getrandbytes + + # make sure reverse works for some random keys + # uses bytes, ints are tested under #01 + for i in range(20): + key1 = getrandbytes(random, 7) + key2 = expand_des_key(key1) + key3 = shrink_des_key(key2) + self.assertEqual(key3, key1) + + # type checks + self.assertRaises(TypeError, shrink_des_key, 1.0) + + # too large + self.assertRaises(ValueError, shrink_des_key, INT_64_MASK+1) + self.assertRaises(ValueError, shrink_des_key, b("\x00")*9) + + # too small + self.assertRaises(ValueError, shrink_des_key, -1) + self.assertRaises(ValueError, shrink_des_key, b("\x00")*7) + + def _random_parity(self, key): + "randomize parity bits" + from passlib.utils.des import _KDATA_MASK, _KPARITY_MASK, INT_64_MASK + from passlib.utils import rng + return (key & _KDATA_MASK) | (rng.randint(0,INT_64_MASK) & _KPARITY_MASK) + + def test_03_encrypt_bytes(self): + "test des_encrypt_block()" + from passlib.utils.des import (des_encrypt_block, shrink_des_key, + _pack64, _unpack64) + + # run through test vectors + for key, plaintext, correct in self.des_test_vectors: + # convert to bytes + key = _pack64(key) + plaintext = _pack64(plaintext) + correct = _pack64(correct) + + # test 64-bit key + result = des_encrypt_block(key, plaintext) + self.assertEqual(result, correct, "key=%r plaintext=%r:" % + (key, plaintext)) + + # test 56-bit version + key2 = shrink_des_key(key) + result = des_encrypt_block(key2, plaintext) + self.assertEqual(result, correct, "key=%r shrink(key)=%r plaintext=%r:" % + (key, key2, plaintext)) + + # test with random parity bits + for _ in range(20): + key3 = _pack64(self._random_parity(_unpack64(key))) + result = des_encrypt_block(key3, plaintext) + self.assertEqual(result, correct, "key=%r rndparity(key)=%r plaintext=%r:" % + (key, key3, plaintext)) + + # check invalid keys + stub = b('\x00') * 8 + self.assertRaises(TypeError, des_encrypt_block, 0, stub) + self.assertRaises(ValueError, des_encrypt_block, b('\x00')*6, stub) + + # check invalid input + self.assertRaises(TypeError, des_encrypt_block, stub, 0) + self.assertRaises(ValueError, des_encrypt_block, stub, b('\x00')*7) + + # check invalid salts + self.assertRaises(ValueError, des_encrypt_block, stub, stub, salt=-1) + self.assertRaises(ValueError, des_encrypt_block, stub, stub, salt=1<<24) + + # check invalid rounds + self.assertRaises(ValueError, des_encrypt_block, stub, stub, 0, rounds=0) + + def test_04_encrypt_ints(self): + "test des_encrypt_int_block()" + from passlib.utils.des import (des_encrypt_int_block, shrink_des_key) + + # run through test vectors + for key, plaintext, correct in self.des_test_vectors: + # test 64-bit key + result = des_encrypt_int_block(key, plaintext) + self.assertEqual(result, correct, "key=%r plaintext=%r:" % + (key, plaintext)) + + # test with random parity bits + for _ in range(20): + key3 = self._random_parity(key) + result = des_encrypt_int_block(key3, plaintext) + self.assertEqual(result, correct, "key=%r rndparity(key)=%r plaintext=%r:" % + (key, key3, plaintext)) + + # check invalid keys + self.assertRaises(TypeError, des_encrypt_int_block, b('\x00'), 0) + self.assertRaises(ValueError, des_encrypt_int_block, -1, 0) + + # check invalid input + self.assertRaises(TypeError, des_encrypt_int_block, 0, b('\x00')) + self.assertRaises(ValueError, des_encrypt_int_block, 0, -1) + + # check invalid salts + self.assertRaises(ValueError, des_encrypt_int_block, 0, 0, salt=-1) + self.assertRaises(ValueError, des_encrypt_int_block, 0, 0, salt=1<<24) + + # check invalid rounds + self.assertRaises(ValueError, des_encrypt_int_block, 0, 0, 0, rounds=0) + +#========================================================= +#test md4 +#========================================================= +class _MD4_Test(TestCase): + #test vectors from http://www.faqs.org/rfcs/rfc1320.html - A.5 + + hash = None + + vectors = [ + # input -> hex digest + (b(""), "31d6cfe0d16ae931b73c59d7e0c089c0"), + (b("a"), "bde52cb31de33e46245e05fbdbd6fb24"), + (b("abc"), "a448017aaf21d8525fc10ae87aa6729d"), + (b("message digest"), "d9130a8164549fe818874806e1c7014b"), + (b("abcdefghijklmnopqrstuvwxyz"), "d79e1c308aa5bbcdeea8ed63df412da9"), + (b("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"), "043f8582f241db351ce627e153e7f0e4"), + (b("12345678901234567890123456789012345678901234567890123456789012345678901234567890"), "e33b4ddc9c38f2199c3e7b164fcc0536"), + ] + + def test_md4_update(self): + "test md4 update" + md4 = self.hash + h = md4(b('')) + self.assertEqual(h.hexdigest(), "31d6cfe0d16ae931b73c59d7e0c089c0") + + #NOTE: under py2, hashlib methods try to encode to ascii, + # though shouldn't rely on that. + if PY3: + self.assertRaises(TypeError, h.update, u('x')) + + h.update(b('a')) + self.assertEqual(h.hexdigest(), "bde52cb31de33e46245e05fbdbd6fb24") + + h.update(b('bcdefghijklmnopqrstuvwxyz')) + self.assertEqual(h.hexdigest(), "d79e1c308aa5bbcdeea8ed63df412da9") + + def test_md4_hexdigest(self): + "test md4 hexdigest()" + md4 = self.hash + for input, hex in self.vectors: + out = md4(input).hexdigest() + self.assertEqual(out, hex) + + def test_md4_digest(self): + "test md4 digest()" + md4 = self.hash + for input, hex in self.vectors: + out = bascii_to_str(hexlify(md4(input).digest())) + self.assertEqual(out, hex) + + def test_md4_copy(self): + "test md4 copy()" + md4 = self.hash + h = md4(b('abc')) + + h2 = h.copy() + h2.update(b('def')) + self.assertEqual(h2.hexdigest(), '804e7f1c2586e50b49ac65db5b645131') + + h.update(b('ghi')) + self.assertEqual(h.hexdigest(), 'c5225580bfe176f6deeee33dee98732c') + +# +#now do a bunch of things to test multiple possible backends. +# +import passlib.utils.md4 as md4_mod + +has_ssl_md4 = (md4_mod.md4 is not md4_mod._builtin_md4) + +if has_ssl_md4: + class MD4_SSL_Test(_MD4_Test): + descriptionPrefix = "MD4 (SSL version)" + hash = staticmethod(md4_mod.md4) + +if not has_ssl_md4 or enable_option("cover"): + class MD4_Builtin_Test(_MD4_Test): + descriptionPrefix = "MD4 (builtin version)" + hash = md4_mod._builtin_md4 + +#========================================================= +#test passlib.utils.pbkdf2 +#========================================================= +import hashlib +import hmac +from passlib.utils import pbkdf2 + +#TODO: should we bother testing hmac_sha1() function? it's verified via sha1_crypt testing. +class CryptoTest(TestCase): + "test various crypto functions" + + ndn_formats = ["hashlib", "iana"] + ndn_values = [ + # (iana name, hashlib name, ... other unnormalized names) + ("md5", "md5", "SCRAM-MD5-PLUS", "MD-5"), + ("sha1", "sha-1", "SCRAM-SHA-1", "SHA1"), + ("sha256", "sha-256", "SHA_256", "sha2-256"), + ("ripemd", "ripemd", "SCRAM-RIPEMD", "RIPEMD"), + ("ripemd160", "ripemd-160", + "SCRAM-RIPEMD-160", "RIPEmd160"), + ("test128", "test-128", "TEST128"), + ("test2", "test2", "TEST-2"), + ("test3128", "test3-128", "TEST-3-128"), + ] + + def test_norm_hash_name(self): + "test norm_hash_name()" + from itertools import chain + from passlib.utils.pbkdf2 import norm_hash_name, _nhn_hash_names + + # test formats + for format in self.ndn_formats: + norm_hash_name("md4", format) + self.assertRaises(ValueError, norm_hash_name, "md4", None) + self.assertRaises(ValueError, norm_hash_name, "md4", "fake") + + # test types + self.assertEqual(norm_hash_name(u("MD4")), "md4") + self.assertEqual(norm_hash_name(b("MD4")), "md4") + self.assertRaises(TypeError, norm_hash_name, None) + + # test selected results + with catch_warnings(): + warnings.filterwarnings("ignore", '.*unknown hash') + for row in chain(_nhn_hash_names, self.ndn_values): + for idx, format in enumerate(self.ndn_formats): + correct = row[idx] + for value in row: + result = norm_hash_name(value, format) + self.assertEqual(result, correct, + "name=%r, format=%r:" % (value, + format)) + +class KdfTest(TestCase): + "test kdf helpers" + + def test_pbkdf1(self): + "test pbkdf1" + for secret, salt, rounds, klen, hash, correct in [ + #http://www.di-mgt.com.au/cryptoKDFs.html + (b('password'), hb('78578E5A5D63CB06'), 1000, 16, 'sha1', + hb('dc19847e05c64d2faf10ebfb4a3d2a20')), + ]: + result = pbkdf2.pbkdf1(secret, salt, rounds, klen, hash) + self.assertEqual(result, correct) + + #test rounds < 1 + #test klen < 0 + #test klen > block size + #test invalid hash + +#NOTE: this is not run directly, but via two subclasses (below) +class _Pbkdf2BackendTest(TestCase): + "test builtin unix crypt backend" + enable_m2crypto = False + + def setUp(self): + #disable m2crypto support so we'll always use software backend + if not self.enable_m2crypto: + self._orig_EVP = pbkdf2._EVP + pbkdf2._EVP = None + else: + #set flag so tests can check for m2crypto presence quickly + self.enable_m2crypto = bool(pbkdf2._EVP) + pbkdf2._clear_prf_cache() + + def tearDown(self): + if not self.enable_m2crypto: + pbkdf2._EVP = self._orig_EVP + pbkdf2._clear_prf_cache() + + #TODO: test get_prf() behavior in various situations - though overall behavior tested via pbkdf2 + + def test_rfc3962(self): + "rfc3962 test vectors" + self.assertFunctionResults(pbkdf2.pbkdf2, [ + # result, secret, salt, rounds, keylen, digest="sha1" + + #test case 1 / 128 bit + ( + hb("cdedb5281bb2f801565a1122b2563515"), + b("password"), b("ATHENA.MIT.EDUraeburn"), 1, 16 + ), + + #test case 2 / 128 bit + ( + hb("01dbee7f4a9e243e988b62c73cda935d"), + b("password"), b("ATHENA.MIT.EDUraeburn"), 2, 16 + ), + + #test case 2 / 256 bit + ( + hb("01dbee7f4a9e243e988b62c73cda935da05378b93244ec8f48a99e61ad799d86"), + b("password"), b("ATHENA.MIT.EDUraeburn"), 2, 32 + ), + + #test case 3 / 256 bit + ( + hb("5c08eb61fdf71e4e4ec3cf6ba1f5512ba7e52ddbc5e5142f708a31e2e62b1e13"), + b("password"), b("ATHENA.MIT.EDUraeburn"), 1200, 32 + ), + + #test case 4 / 256 bit + ( + hb("d1daa78615f287e6a1c8b120d7062a493f98d203e6be49a6adf4fa574b6e64ee"), + b("password"), b('\x12\x34\x56\x78\x78\x56\x34\x12'), 5, 32 + ), + + #test case 5 / 256 bit + ( + hb("139c30c0966bc32ba55fdbf212530ac9c5ec59f1a452f5cc9ad940fea0598ed1"), + b("X"*64), b("pass phrase equals block size"), 1200, 32 + ), + + #test case 6 / 256 bit + ( + hb("9ccad6d468770cd51b10e6a68721be611a8b4d282601db3b36be9246915ec82a"), + b("X"*65), b("pass phrase exceeds block size"), 1200, 32 + ), + ]) + + def test_rfc6070(self): + "rfc6070 test vectors" + self.assertFunctionResults(pbkdf2.pbkdf2, [ + + ( + hb("0c60c80f961f0e71f3a9b524af6012062fe037a6"), + b("password"), b("salt"), 1, 20, + ), + + ( + hb("ea6c014dc72d6f8ccd1ed92ace1d41f0d8de8957"), + b("password"), b("salt"), 2, 20, + ), + + ( + hb("4b007901b765489abead49d926f721d065a429c1"), + b("password"), b("salt"), 4096, 20, + ), + + #just runs too long - could enable if ALL option is set + ##( + ## + ## unhexlify("eefe3d61cd4da4e4e9945b3d6ba2158c2634e984"), + ## "password", "salt", 16777216, 20, + ##), + + ( + hb("3d2eec4fe41c849b80c8d83662c0e44a8b291a964cf2f07038"), + b("passwordPASSWORDpassword"), + b("saltSALTsaltSALTsaltSALTsaltSALTsalt"), + 4096, 25, + ), + + ( + hb("56fa6aa75548099dcc37d7f03425e0c3"), + b("pass\00word"), b("sa\00lt"), 4096, 16, + ), + ]) + + def test_invalid_values(self): + + #invalid rounds + self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), -1, 16) + self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 0, 16) + self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), b('salt'), 'x', 16) + + #invalid keylen + self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), + 1, 20*(2**32-1)+1) + + #invalid salt type + self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), 5, 1, 10) + + #invalid secret type + self.assertRaises(TypeError, pbkdf2.pbkdf2, 5, b('salt'), 1, 10) + + #invalid hash + self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 'hmac-foo') + self.assertRaises(ValueError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 'foo') + self.assertRaises(TypeError, pbkdf2.pbkdf2, b('password'), b('salt'), 1, 16, 5) + + def test_default_keylen(self): + "test keylen==-1" + self.assertEqual(len(pbkdf2.pbkdf2(b('password'), b('salt'), 1, -1, + prf='hmac-sha1')), 20) + + self.assertEqual(len(pbkdf2.pbkdf2(b('password'), b('salt'), 1, -1, + prf='hmac-sha256')), 32) + + def test_hmac_sha1(self): + "test independant hmac_sha1() method" + self.assertEqual( + pbkdf2.hmac_sha1(b("secret"), b("salt")), + b('\xfc\xd4\x0c;]\r\x97\xc6\xf1S\x8d\x93\xb9\xeb\xc6\x00\x04.\x8b\xfe') + ) + + def test_sha1_string(self): + "test various prf values" + self.assertEqual( + pbkdf2.pbkdf2(b("secret"), b("salt"), 10, 16, "hmac-sha1"), + b('\xe2H\xfbk\x136QF\xf8\xacc\x07\xcc"(\x12') + ) + + def test_sha512_string(self): + "test alternate digest string (sha512)" + self.assertFunctionResults(pbkdf2.pbkdf2, [ + # result, secret, salt, rounds, keylen, digest="sha1" + + #case taken from example in http://grub.enbug.org/Authentication + ( + hb("887CFF169EA8335235D8004242AA7D6187A41E3187DF0CE14E256D85ED97A97357AAA8FF0A3871AB9EEFF458392F462F495487387F685B7472FC6C29E293F0A0"), + b("hello"), + hb("9290F727ED06C38BA4549EF7DE25CF5642659211B7FC076F2D28FEFD71784BB8D8F6FB244A8CC5C06240631B97008565A120764C0EE9C2CB0073994D79080136"), + 10000, 64, "hmac-sha512" + ), + ]) + + def test_sha512_function(self): + "test custom digest function" + def prf(key, msg): + return hmac.new(key, msg, hashlib.sha512).digest() + + self.assertFunctionResults(pbkdf2.pbkdf2, [ + # result, secret, salt, rounds, keylen, digest="sha1" + + #case taken from example in http://grub.enbug.org/Authentication + ( + hb("887CFF169EA8335235D8004242AA7D6187A41E3187DF0CE14E256D85ED97A97357AAA8FF0A3871AB9EEFF458392F462F495487387F685B7472FC6C29E293F0A0"), + b("hello"), + hb("9290F727ED06C38BA4549EF7DE25CF5642659211B7FC076F2D28FEFD71784BB8D8F6FB244A8CC5C06240631B97008565A120764C0EE9C2CB0073994D79080136"), + 10000, 64, prf, + ), + ]) + +has_m2crypto = (pbkdf2._EVP is not None) + +if has_m2crypto: + class Pbkdf2_M2Crypto_Test(_Pbkdf2BackendTest): + descriptionPrefix = "pbkdf2 (m2crypto backend)" + enable_m2crypto = True + +if not has_m2crypto or enable_option("cover"): + class Pbkdf2_Builtin_Test(_Pbkdf2BackendTest): + descriptionPrefix = "pbkdf2 (builtin backend)" + enable_m2crypto = False + +#========================================================= +#EOF +#========================================================= diff --git a/passlib/tests/tox_support.py b/passlib/tests/tox_support.py new file mode 100644 index 0000000..7da0546 --- /dev/null +++ b/passlib/tests/tox_support.py @@ -0,0 +1,37 @@ +"""passlib.tests.tox_support - helper script for tox tests""" +#============================================================================= +# imports +#============================================================================= +# core +import os +import logging; log = logging.getLogger(__name__) +# site +# pkg +# local +__all__ = [ +] + +#============================================================================= +# main +#============================================================================= +def main(path, runtime): + "write fake GAE ``app.yaml`` to current directory so nosegae will work" + from passlib.tests.utils import set_file + set_file(os.path.join(path, "app.yaml"), """\ +application: fake-app +version: 2 +runtime: %s +api_version: 1 + +handlers: +- url: /.* + script: dummy.py +""" % runtime) + +if __name__ == "__main__": + import sys + sys.exit(main(*sys.argv[1:]) or 0) + +#============================================================================= +# eof +#============================================================================= diff --git a/passlib/tests/utils.py b/passlib/tests/utils.py index 713824e..5cbb427 100644 --- a/passlib/tests/utils.py +++ b/passlib/tests/utils.py @@ -42,7 +42,7 @@ from passlib.utils import has_rounds_info, has_salt_info, rounds_cost_values, \ classproperty, rng, getrandstr, is_ascii_safe, to_native_str, \ repeat_string from passlib.utils.compat import b, bytes, iteritems, irange, callable, \ - base_string_types, exc_err, u, unicode + base_string_types, exc_err, u, unicode, PY2 import passlib.utils.handlers as uh #local __all__ = [ @@ -134,6 +134,18 @@ def get_file(path): with open(path, "rb") as fh: return fh.read() +def tonn(source): + "convert native string to non-native string" + if not isinstance(source, str): + return source + elif PY3: + return source.encode("utf-8") + else: + try: + return source.decode("utf-8") + except UnicodeDecodeError: + return source.decode("latin-1") + #========================================================= #custom test base #========================================================= @@ -493,6 +505,13 @@ class TestCase(unittest.TestCase): msg = "error for case %r:" % (elem.render(1),) self.assertEqual(result, correct, msg) + def require_stringprep(self): + "helper to skip test if stringprep is missing" + from passlib.utils import stringprep + if not stringprep: + from passlib.utils import _stringprep_missing_reason + raise self.skipTest("not available - stringprep module is " + + _stringprep_missing_reason) #============================================================ #eoc #============================================================ @@ -741,7 +760,7 @@ class HandlerCase(TestCase): # XXX: any more checks needed? - def test_02_config(self): + def test_02_config_workflow(self): """test basic config-string workflow this tests that genconfig() returns the expected types, @@ -785,7 +804,7 @@ class HandlerCase(TestCase): else: self.assertRaises(TypeError, self.do_identify, config) - def test_03_hash(self): + def test_03_hash_workflow(self): """test basic hash-string workflow. this tests that encrypt()'s hashes are accepted @@ -835,8 +854,36 @@ class HandlerCase(TestCase): # self.assertTrue(self.do_identify(result)) + def test_04_hash_types(self): + "test hashes can be unicode or bytes" + # this runs through workflow similar to 03, but wraps + # everything using tonn() so we test unicode under py2, + # and bytes under py3. + + # encrypt using non-native secret + result = self.do_encrypt(tonn('stub')) + self.check_returned_native_str(result, "encrypt") + + # verify using non-native hash + self.check_verify('stub', tonn(result)) + + # verify using non-native hash AND secret + self.check_verify(tonn('stub'), tonn(result)) - def test_04_backends(self): + # genhash using non-native hash + other = self.do_genhash('stub', tonn(result)) + self.check_returned_native_str(other, "genhash") + self.assertEqual(other, result) + + # genhash using non-native hash AND secret + other = self.do_genhash(tonn('stub'), tonn(result)) + self.check_returned_native_str(other, "genhash") + self.assertEqual(other, result) + + # identify using non-native hash + self.assertTrue(self.do_identify(tonn(result))) + + def test_05_backends(self): "test multi-backend support" handler = self.handler if not hasattr(handler, "set_backend"): @@ -1065,6 +1112,34 @@ class HandlerCase(TestCase): self.assertRaises(ValueError, self.do_genconfig, salt=c*chunk, __msg__="invalid salt char %r:" % (c,)) + @property + def salt_type(self): + "hack to determine salt keyword's datatype" + # NOTE: cisco_type7 uses 'int' + if getattr(self.handler, "_salt_is_bytes", False): + return bytes + else: + return unicode + + def test_15_salt_type(self): + "test non-string salt values" + self.require_salt() + salt_type = self.salt_type + + # should always throw error for random class. + class fake(object): + pass + self.assertRaises(TypeError, self.do_encrypt, 'stub', salt=fake()) + + # unicode should be accepted only if salt_type is unicode. + if salt_type is not unicode: + self.assertRaises(TypeError, self.do_encrypt, 'stub', salt=u('x')) + + # bytes should be accepted only if salt_type is bytes, + # OR if salt type is unicode and running PY2 - to allow native strings. + if not (salt_type is bytes or (PY2 and salt_type is unicode)): + self.assertRaises(TypeError, self.do_encrypt, 'stub', salt=b('x')) + #============================================================== # rounds #============================================================== @@ -1561,12 +1636,15 @@ class HandlerCase(TestCase): # run through all verifiers we found. for verify in verifiers: name = vname(verify) - - if not verify(secret, hash, **ctx): + result = verify(secret, hash, **ctx) + if result == "skip": # let verifiers signal lack of support + continue + assert result is True or result is False + if not result: raise self.failureException("failed to verify against %s: " "secret=%r config=%r hash=%r" % (name, secret, kwds, hash)) - # occasionally check that some other secret WON'T verify + # occasionally check that some other secrets WON'T verify # against this hash. if rng.random() < .1 and verify(other, hash, **ctx): raise self.failureException("was able to verify wrong " @@ -1588,13 +1666,16 @@ class HandlerCase(TestCase): handler = self.handler verifiers = [] - # test against self - def check_default(secret, hash, **ctx): - "self" - return self.do_verify(secret, hash, **ctx) - verifiers.append(check_default) + # call all methods starting with prefix in order to create + # any verifiers. + prefix = "fuzz_verifier_" + for name in dir(self): + if name.startswith(prefix): + func = getattr(self, name)() + if func is not None: + verifiers.append(func) - # test against any other available backends + # create verifiers for any other available backends if hasattr(handler, "backends") and enable_option("all-backends"): def maker(backend): def func(secret, hash): @@ -1604,23 +1685,41 @@ class HandlerCase(TestCase): func.__doc__ = backend + "-backend" return func cur = handler.get_backend() - check_default.__doc__ = cur + "-backend" for backend in handler.backends: if backend != cur and handler.has_backend(backend): verifiers.append(maker(backend)) + return verifiers + + def fuzz_verifier_default(self): + # test against self + def check_default(secret, hash, **ctx): + return self.do_verify(secret, hash, **ctx) + if self.backend: + check_default.__doc__ = self.backend + "-backend" + else: + check_default.__doc__ = "self" + return check_default + + def os_supports_ident(self, ident): + "skip verifier_crypt when OS doesn't support ident" + return True + + def fuzz_verifier_crypt(self): # test againt OS crypt() # NOTE: skipping this if using_patched_crypt since _has_crypt_support() # will return false positive in that case. - if not self.using_patched_crypt and _has_crypt_support(handler): - from crypt import crypt - def check_crypt(secret, hash): - "stdlib-crypt" - secret = to_native_str(secret, self.fuzz_password_encoding) - return crypt(secret, hash) == hash - verifiers.append(check_crypt) - - return verifiers + handler = self.handler + if self.using_patched_crypt or not _has_crypt_support(handler): + return None + from crypt import crypt + def check_crypt(secret, hash): + "stdlib-crypt" + if not self.os_supports_ident(hash): + return "skip" + secret = to_native_str(secret, self.fuzz_password_encoding) + return crypt(secret, hash) == hash + return check_crypt def get_fuzz_password(self): "generate random passwords (for fuzz testing)" @@ -1672,11 +1771,8 @@ class HandlerCase(TestCase): return rng.choice(handler.ident_values) #========================================================= - # test 8x - mixin tests - # test 9x - handler-specific tests - #========================================================= - - #========================================================= + # test 8x - mixin tests + # test 9x - handler-specific tests # eoc #========================================================= @@ -1699,9 +1795,6 @@ class OsCryptMixin(HandlerCase): # encodeds as os.platform prefixes. platform_crypt_support = dict() - # TODO: test that os_crypt support is detected correct on the expected - # platofrms. - #========================================================= # instance attrs #========================================================= @@ -1746,20 +1839,27 @@ class OsCryptMixin(HandlerCase): #========================================================= # custom tests #========================================================= - def test_80_faulty_crypt(self): - "test with faulty crypt()" - # patch safe_crypt to return mock value. + def _use_mock_crypt(self): + "patch safe_crypt() so it returns mock value" import passlib.utils as mod - self.addCleanup(setattr, mod, "_crypt", mod._crypt) + if not self.using_patched_crypt: + self.addCleanup(setattr, mod, "_crypt", mod._crypt) crypt_value = [None] mod._crypt = lambda secret, config: crypt_value[0] + def setter(value): + crypt_value[0] = value + return setter - # prepare framework + def test_80_faulty_crypt(self): + "test with faulty crypt()" hash = self.get_sample_hash()[1] exc_types = (AssertionError,) + setter = self._use_mock_crypt() def test(value): - crypt_value[0] = value + # set safe_crypt() to return specified value, and + # make sure assertion error is raised by handler. + setter(value) self.assertRaises(exc_types, self.do_genhash, "stub", hash) self.assertRaises(exc_types, self.do_encrypt, "stub") self.assertRaises(exc_types, self.do_verify, "stub", hash) @@ -1768,8 +1868,27 @@ class OsCryptMixin(HandlerCase): test(hash[:-1]) # detect too short test(hash + 'x') # detect too long - def test_81_crypt_support(self): - "test crypt support detection" + def test_81_crypt_fallback(self): + "test per-call crypt() fallback" + # set safe_crypt to return None + setter = self._use_mock_crypt() + setter(None) + if _find_alternate_backend(self.handler, "os_crypt"): + # handler should have a fallback to use + h1 = self.do_encrypt("stub") + h2 = self.do_genhash("stub", h1) + self.assertEqual(h2, h1) + self.assertTrue(self.do_verify("stub", h1)) + else: + # handler should give up + from passlib.exc import MissingBackendError + hash = self.get_sample_hash()[1] + self.assertRaises(MissingBackendError, self.do_encrypt, 'stub') + self.assertRaises(MissingBackendError, self.do_genhash, 'stub', hash) + self.assertRaises(MissingBackendError, self.do_verify, 'stub', hash) + + def test_82_crypt_support(self): + "test platform-specific crypt() support detection" platform = sys.platform for name, flag in self.platform_crypt_support.items(): if not platform.startswith(name): |