diff options
Diffstat (limited to 'passlib/tests/test_context_deprecated.py')
-rw-r--r-- | passlib/tests/test_context_deprecated.py | 1165 |
1 files changed, 1165 insertions, 0 deletions
diff --git a/passlib/tests/test_context_deprecated.py b/passlib/tests/test_context_deprecated.py new file mode 100644 index 0000000..f6d33d8 --- /dev/null +++ b/passlib/tests/test_context_deprecated.py @@ -0,0 +1,1165 @@ +"""tests for passlib.context + +this file is a clone of the 1.5 test_context.py, +containing the tests using the legacy CryptPolicy api. +it's being preserved here to ensure the old api doesn't break +(until Passlib 1.8, when this and the legacy api will be removed). +""" +#========================================================= +#imports +#========================================================= +from __future__ import with_statement +#core +import hashlib +from logging import getLogger +import os +import time +import warnings +import sys +#site +try: + from pkg_resources import resource_filename +except ImportError: + resource_filename = None +#pkg +from passlib import hash +from passlib.context import CryptContext, CryptPolicy, LazyCryptContext +from passlib.exc import PasslibConfigWarning +from passlib.utils import tick, to_bytes, to_unicode +from passlib.utils.compat import irange, u +import passlib.utils.handlers as uh +from passlib.tests.utils import TestCase, mktemp, catch_warnings, \ + gae_env, set_file +from passlib.registry import register_crypt_handler_path, has_crypt_handler, \ + _unload_handler_name as unload_handler_name +#module +log = getLogger(__name__) + +#========================================================= +# +#========================================================= +class CryptPolicyTest(TestCase): + "test CryptPolicy object" + + #TODO: need to test user categories w/in all this + + descriptionPrefix = "CryptPolicy" + + #========================================================= + #sample crypt policies used for testing + #========================================================= + + #----------------------------------------------------- + #sample 1 - average config file + #----------------------------------------------------- + #NOTE: copy of this is stored in file passlib/tests/sample_config_1s.cfg + sample_config_1s = """\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +all.vary_rounds = 10%% +bsdi_crypt.max_rounds = 30000 +bsdi_crypt.default_rounds = 25000 +sha512_crypt.max_rounds = 50000 +sha512_crypt.min_rounds = 40000 +""" + sample_config_1s_path = os.path.abspath(os.path.join( + os.path.dirname(__file__), "sample_config_1s.cfg")) + if not os.path.exists(sample_config_1s_path) and resource_filename: + #in case we're zipped up in an egg. + sample_config_1s_path = resource_filename("passlib.tests", + "sample_config_1s.cfg") + + #make sure sample_config_1s uses \n linesep - tests rely on this + assert sample_config_1s.startswith("[passlib]\nschemes") + + sample_config_1pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__max_rounds = 30000, + bsdi_crypt__default_rounds = 25000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds = 40000, + ) + + sample_config_1pid = { + "schemes": "des_crypt, md5_crypt, bsdi_crypt, sha512_crypt", + "default": "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + "all.vary_rounds": 0.1, + "bsdi_crypt.max_rounds": 30000, + "bsdi_crypt.default_rounds": 25000, + "sha512_crypt.max_rounds": 50000, + "sha512_crypt.min_rounds": 40000, + } + + sample_config_1prd = dict( + schemes = [ hash.des_crypt, hash.md5_crypt, hash.bsdi_crypt, hash.sha512_crypt], + default = "md5_crypt", # NOTE: passlib <= 1.5 was handler obj. + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__max_rounds = 30000, + bsdi_crypt__default_rounds = 25000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds = 40000, + ) + + #----------------------------------------------------- + #sample 2 - partial policy & result of overlay on sample 1 + #----------------------------------------------------- + sample_config_2s = """\ +[passlib] +bsdi_crypt.min_rounds = 29000 +bsdi_crypt.max_rounds = 35000 +bsdi_crypt.default_rounds = 31000 +sha512_crypt.min_rounds = 45000 +""" + + sample_config_2pd = dict( + #using this to test full replacement of existing options + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + #using this to test partial replacement of existing options + sha512_crypt__min_rounds=45000, + ) + + sample_config_12pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "md5_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds=45000, + ) + + #----------------------------------------------------- + #sample 3 - just changing default + #----------------------------------------------------- + sample_config_3pd = dict( + default="sha512_crypt", + ) + + sample_config_123pd = dict( + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + default = "sha512_crypt", + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + bsdi_crypt__min_rounds = 29000, + bsdi_crypt__max_rounds = 35000, + bsdi_crypt__default_rounds = 31000, + sha512_crypt__max_rounds = 50000, + sha512_crypt__min_rounds=45000, + ) + + #----------------------------------------------------- + #sample 4 - category specific + #----------------------------------------------------- + sample_config_4s = """ +[passlib] +schemes = sha512_crypt +all.vary_rounds = 10%% +default.sha512_crypt.max_rounds = 20000 +admin.all.vary_rounds = 5%% +admin.sha512_crypt.max_rounds = 40000 +""" + + sample_config_4pd = dict( + schemes = [ "sha512_crypt" ], + # NOTE: not maintaining backwards compat for rendering to "10%" + all__vary_rounds = 0.1, + sha512_crypt__max_rounds = 20000, + # NOTE: not maintaining backwards compat for rendering to "5%" + admin__all__vary_rounds = 0.05, + admin__sha512_crypt__max_rounds = 40000, + ) + + #----------------------------------------------------- + #sample 5 - to_string & deprecation testing + #----------------------------------------------------- + sample_config_5s = sample_config_1s + """\ +deprecated = des_crypt +admin__context__deprecated = des_crypt, bsdi_crypt +""" + + sample_config_5pd = sample_config_1pd.copy() + sample_config_5pd.update( + deprecated = [ "des_crypt" ], + admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], + ) + + sample_config_5pid = sample_config_1pid.copy() + sample_config_5pid.update({ + "deprecated": "des_crypt", + "admin.context.deprecated": "des_crypt, bsdi_crypt", + }) + + sample_config_5prd = sample_config_1prd.copy() + sample_config_5prd.update({ + # XXX: should deprecated return the actual handlers in this case? + # would have to modify how policy stores info, for one. + "deprecated": ["des_crypt"], + "admin__context__deprecated": ["des_crypt", "bsdi_crypt"], + }) + + #========================================================= + #constructors + #========================================================= + def setUp(self): + TestCase.setUp(self) + warnings.filterwarnings("ignore", + r"The CryptPolicy class has been deprecated") + + def test_00_constructor(self): + "test CryptPolicy() constructor" + policy = CryptPolicy(**self.sample_config_1pd) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #check key with too many separators is rejected + self.assertRaises(TypeError, CryptPolicy, + schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + bad__key__bsdi_crypt__max_rounds = 30000, + ) + + #check nameless handler rejected + class nameless(uh.StaticHandler): + name = None + self.assertRaises(ValueError, CryptPolicy, schemes=[nameless]) + + # check scheme must be name or crypt handler + self.assertRaises(TypeError, CryptPolicy, schemes=[uh.StaticHandler]) + + #check name conflicts are rejected + class dummy_1(uh.StaticHandler): + name = 'dummy_1' + self.assertRaises(KeyError, CryptPolicy, schemes=[dummy_1, dummy_1]) + + #with unknown deprecated value + self.assertRaises(KeyError, CryptPolicy, + schemes=['des_crypt'], + deprecated=['md5_crypt']) + + #with unknown default value + self.assertRaises(KeyError, CryptPolicy, + schemes=['des_crypt'], + default='md5_crypt') + + def test_01_from_path_simple(self): + "test CryptPolicy.from_path() constructor" + #NOTE: this is separate so it can also run under GAE + + #test preset stored in existing file + path = self.sample_config_1s_path + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test if path missing + self.assertRaises(EnvironmentError, CryptPolicy.from_path, path + 'xxx') + + def test_01_from_path(self): + "test CryptPolicy.from_path() constructor with encodings" + if gae_env: + return self.skipTest("GAE doesn't offer read/write filesystem access") + + path = mktemp() + + #test "\n" linesep + set_file(path, self.sample_config_1s) + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test "\r\n" linesep + set_file(path, self.sample_config_1s.replace("\n","\r\n")) + policy = CryptPolicy.from_path(path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with custom encoding + uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") + set_file(path, uc2) + policy = CryptPolicy.from_path(path, encoding="utf-16") + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + def test_02_from_string(self): + "test CryptPolicy.from_string() constructor" + #test "\n" linesep + policy = CryptPolicy.from_string(self.sample_config_1s) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test "\r\n" linesep + policy = CryptPolicy.from_string( + self.sample_config_1s.replace("\n","\r\n")) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with unicode + data = to_unicode(self.sample_config_1s) + policy = CryptPolicy.from_string(data) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test with non-ascii-compatible encoding + uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") + policy = CryptPolicy.from_string(uc2, encoding="utf-16") + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #test category specific options + policy = CryptPolicy.from_string(self.sample_config_4s) + self.assertEqual(policy.to_dict(), self.sample_config_4pd) + + def test_03_from_source(self): + "test CryptPolicy.from_source() constructor" + #pass it a path + policy = CryptPolicy.from_source(self.sample_config_1s_path) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it a string + policy = CryptPolicy.from_source(self.sample_config_1s) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it a dict (NOTE: make a copy to detect in-place modifications) + policy = CryptPolicy.from_source(self.sample_config_1pd.copy()) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass it existing policy + p2 = CryptPolicy.from_source(policy) + self.assertIs(policy, p2) + + #pass it something wrong + self.assertRaises(TypeError, CryptPolicy.from_source, 1) + self.assertRaises(TypeError, CryptPolicy.from_source, []) + + def test_04_from_sources(self): + "test CryptPolicy.from_sources() constructor" + + #pass it empty list + self.assertRaises(ValueError, CryptPolicy.from_sources, []) + + #pass it one-element list + policy = CryptPolicy.from_sources([self.sample_config_1s]) + self.assertEqual(policy.to_dict(), self.sample_config_1pd) + + #pass multiple sources + policy = CryptPolicy.from_sources( + [ + self.sample_config_1s_path, + self.sample_config_2s, + self.sample_config_3pd, + ]) + self.assertEqual(policy.to_dict(), self.sample_config_123pd) + + def test_05_replace(self): + "test CryptPolicy.replace() constructor" + + p1 = CryptPolicy(**self.sample_config_1pd) + + #check overlaying sample 2 + p2 = p1.replace(**self.sample_config_2pd) + self.assertEqual(p2.to_dict(), self.sample_config_12pd) + + #check repeating overlay makes no change + p2b = p2.replace(**self.sample_config_2pd) + self.assertEqual(p2b.to_dict(), self.sample_config_12pd) + + #check overlaying sample 3 + p3 = p2.replace(self.sample_config_3pd) + self.assertEqual(p3.to_dict(), self.sample_config_123pd) + + def test_06_forbidden(self): + "test CryptPolicy() forbidden kwds" + + #salt not allowed to be set + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + des_crypt__salt="xx", + ) + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + all__salt="xx", + ) + + #schemes not allowed for category + self.assertRaises(KeyError, CryptPolicy, + schemes=["des_crypt"], + user__context__schemes=["md5_crypt"], + ) + + #========================================================= + #reading + #========================================================= + def test_10_has_schemes(self): + "test has_schemes() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + self.assertTrue(p1.has_schemes()) + + p3 = CryptPolicy(**self.sample_config_3pd) + self.assertTrue(not p3.has_schemes()) + + def test_11_iter_handlers(self): + "test iter_handlers() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + s = self.sample_config_1prd['schemes'] + self.assertEqual(list(p1.iter_handlers()), s) + + p3 = CryptPolicy(**self.sample_config_3pd) + self.assertEqual(list(p3.iter_handlers()), []) + + def test_12_get_handler(self): + "test get_handler() method" + + p1 = CryptPolicy(**self.sample_config_1pd) + + #check by name + self.assertIs(p1.get_handler("bsdi_crypt"), hash.bsdi_crypt) + + #check by missing name + self.assertIs(p1.get_handler("sha256_crypt"), None) + self.assertRaises(KeyError, p1.get_handler, "sha256_crypt", required=True) + + #check default + self.assertIs(p1.get_handler(), hash.md5_crypt) + + def test_13_get_options(self): + "test get_options() method" + + p12 = CryptPolicy(**self.sample_config_12pd) + + self.assertEqual(p12.get_options("bsdi_crypt"),dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds = 0.1, + min_rounds = 29000, + max_rounds = 35000, + default_rounds = 31000, + )) + + self.assertEqual(p12.get_options("sha512_crypt"),dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds = 0.1, + min_rounds = 45000, + max_rounds = 50000, + )) + + p4 = CryptPolicy.from_string(self.sample_config_4s) + self.assertEqual(p4.get_options("sha512_crypt"), dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds=0.1, + max_rounds=20000, + )) + + self.assertEqual(p4.get_options("sha512_crypt", "user"), dict( + # NOTE: not maintaining backwards compat for rendering to "10%" + vary_rounds=0.1, + max_rounds=20000, + )) + + self.assertEqual(p4.get_options("sha512_crypt", "admin"), dict( + # NOTE: not maintaining backwards compat for rendering to "5%" + vary_rounds=0.05, + max_rounds=40000, + )) + + def test_14_handler_is_deprecated(self): + "test handler_is_deprecated() method" + pa = CryptPolicy(**self.sample_config_1pd) + pb = CryptPolicy(**self.sample_config_5pd) + + self.assertFalse(pa.handler_is_deprecated("des_crypt")) + self.assertFalse(pa.handler_is_deprecated(hash.bsdi_crypt)) + self.assertFalse(pa.handler_is_deprecated("sha512_crypt")) + + self.assertTrue(pb.handler_is_deprecated("des_crypt")) + self.assertFalse(pb.handler_is_deprecated(hash.bsdi_crypt)) + self.assertFalse(pb.handler_is_deprecated("sha512_crypt")) + + #check categories as well + self.assertTrue(pb.handler_is_deprecated("des_crypt", "user")) + self.assertFalse(pb.handler_is_deprecated("bsdi_crypt", "user")) + self.assertTrue(pb.handler_is_deprecated("des_crypt", "admin")) + self.assertTrue(pb.handler_is_deprecated("bsdi_crypt", "admin")) + + # check deprecation is overridden per category + pc = CryptPolicy( + schemes=["md5_crypt", "des_crypt"], + deprecated=["md5_crypt"], + user__context__deprecated=["des_crypt"], + ) + self.assertTrue(pc.handler_is_deprecated("md5_crypt")) + self.assertFalse(pc.handler_is_deprecated("des_crypt")) + self.assertFalse(pc.handler_is_deprecated("md5_crypt", "user")) + self.assertTrue(pc.handler_is_deprecated("des_crypt", "user")) + + def test_15_min_verify_time(self): + "test get_min_verify_time() method" + # silence deprecation warnings for min verify time + warnings.filterwarnings("ignore", category=DeprecationWarning) + + pa = CryptPolicy() + self.assertEqual(pa.get_min_verify_time(), 0) + self.assertEqual(pa.get_min_verify_time('admin'), 0) + + pb = pa.replace(min_verify_time=.1) + self.assertEqual(pb.get_min_verify_time(), .1) + self.assertEqual(pb.get_min_verify_time('admin'), .1) + + pc = pa.replace(admin__context__min_verify_time=.2) + self.assertEqual(pc.get_min_verify_time(), 0) + self.assertEqual(pc.get_min_verify_time('admin'), .2) + + pd = pb.replace(admin__context__min_verify_time=.2) + self.assertEqual(pd.get_min_verify_time(), .1) + self.assertEqual(pd.get_min_verify_time('admin'), .2) + + #========================================================= + #serialization + #========================================================= + def test_20_iter_config(self): + "test iter_config() method" + p5 = CryptPolicy(**self.sample_config_5pd) + self.assertEqual(dict(p5.iter_config()), self.sample_config_5pd) + self.assertEqual(dict(p5.iter_config(resolve=True)), self.sample_config_5prd) + self.assertEqual(dict(p5.iter_config(ini=True)), self.sample_config_5pid) + + def test_21_to_dict(self): + "test to_dict() method" + p5 = CryptPolicy(**self.sample_config_5pd) + self.assertEqual(p5.to_dict(), self.sample_config_5pd) + self.assertEqual(p5.to_dict(resolve=True), self.sample_config_5prd) + + def test_22_to_string(self): + "test to_string() method" + pa = CryptPolicy(**self.sample_config_5pd) + s = pa.to_string() #NOTE: can't compare string directly, ordering etc may not match + pb = CryptPolicy.from_string(s) + self.assertEqual(pb.to_dict(), self.sample_config_5pd) + + #========================================================= + # + #========================================================= + +#========================================================= +#CryptContext +#========================================================= +class CryptContextTest(TestCase): + "test CryptContext class" + descriptionPrefix = "CryptContext" + + def setUp(self): + TestCase.setUp(self) + warnings.filterwarnings("ignore", + r"CryptContext\(\)\.replace\(\) has been deprecated.*") + warnings.filterwarnings("ignore", + r"The CryptContext ``policy`` keyword has been deprecated.*") + warnings.filterwarnings("ignore", ".*(CryptPolicy|context\.policy).*(has|have) been deprecated.*") + + #========================================================= + #constructor + #========================================================= + def test_00_constructor(self): + "test constructor" + #create crypt context using handlers + cc = CryptContext([hash.md5_crypt, hash.bsdi_crypt, hash.des_crypt]) + c,b,a = cc.policy.iter_handlers() + self.assertIs(a, hash.des_crypt) + self.assertIs(b, hash.bsdi_crypt) + self.assertIs(c, hash.md5_crypt) + + #create context using names + cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) + c,b,a = cc.policy.iter_handlers() + self.assertIs(a, hash.des_crypt) + self.assertIs(b, hash.bsdi_crypt) + self.assertIs(c, hash.md5_crypt) + + #TODO: test policy & other options + + def test_01_replace(self): + "test replace()" + + cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) + self.assertIs(cc.policy.get_handler(), hash.md5_crypt) + + cc2 = cc.replace() + self.assertIsNot(cc2, cc) + # NOTE: was not able to maintain backward compatibility with this... + ##self.assertIs(cc2.policy, cc.policy) + + cc3 = cc.replace(default="bsdi_crypt") + self.assertIsNot(cc3, cc) + # NOTE: was not able to maintain backward compatibility with this... + ##self.assertIs(cc3.policy, cc.policy) + self.assertIs(cc3.policy.get_handler(), hash.bsdi_crypt) + + def test_02_no_handlers(self): + "test no handlers" + + #check constructor... + cc = CryptContext() + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + self.assertRaises(KeyError, cc.encrypt, 'secret') + self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + + #check updating policy after the fact... + cc = CryptContext(['md5_crypt']) + p = CryptPolicy(schemes=[]) + cc.policy = p + + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + self.assertRaises(KeyError, cc.encrypt, 'secret') + self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + + #========================================================= + #policy adaptation + #========================================================= + sample_policy_1 = dict( + schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", + "sha256_crypt"], + deprecated = [ "des_crypt", ], + default = "sha256_crypt", + bsdi_crypt__max_rounds = 30, + bsdi_crypt__default_rounds = 25, + bsdi_crypt__vary_rounds = 0, + sha256_crypt__max_rounds = 3000, + sha256_crypt__min_rounds = 2000, + sha256_crypt__default_rounds = 3000, + phpass__ident = "H", + phpass__default_rounds = 7, + ) + + def test_10_01_genconfig_settings(self): + "test genconfig() settings" + cc = CryptContext(policy=None, + schemes=["md5_crypt", "phpass"], + phpass__ident="H", + phpass__default_rounds=7, + ) + + # hash specific settings + self.assertTrue(cc.genconfig().startswith("$1$")) + self.assertEqual( + cc.genconfig(scheme="phpass", salt='.'*8), + '$H$5........', + ) + self.assertEqual( + cc.genconfig(scheme="phpass", salt='.'*8, rounds=8, ident='P'), + '$P$6........', + ) + + # unsupported hash settings should be rejected + self.assertRaises(KeyError, cc.replace, md5_crypt__ident="P") + + def test_10_02_genconfig_rounds_limits(self): + "test genconfig() policy rounds limits" + cc = CryptContext(policy=None, + schemes=["sha256_crypt"], + all__min_rounds=2000, + all__max_rounds=3000, + all__default_rounds=2500, + ) + + # min rounds + with catch_warnings(record=True) as wlog: + + # set below handler min + c2 = cc.replace(all__min_rounds=500, all__max_rounds=None, + all__default_rounds=500) + self.consumeWarningList(wlog, [PasslibConfigWarning]*2) + self.assertEqual(c2.genconfig(salt="nacl"), "$5$rounds=1000$nacl$") + self.consumeWarningList(wlog) + + # below + self.assertEqual( + cc.genconfig(rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + # equal + self.assertEqual( + cc.genconfig(rounds=2000, salt="nacl"), + '$5$rounds=2000$nacl$', + ) + self.consumeWarningList(wlog) + + # above + self.assertEqual( + cc.genconfig(rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$' + ) + self.consumeWarningList(wlog) + + # max rounds + with catch_warnings(record=True) as wlog: + # set above handler max + c2 = cc.replace(all__max_rounds=int(1e9)+500, all__min_rounds=None, + all__default_rounds=int(1e9)+500) + self.consumeWarningList(wlog, [PasslibConfigWarning]*2) + self.assertEqual(c2.genconfig(salt="nacl"), + "$5$rounds=999999999$nacl$") + self.consumeWarningList(wlog) + + # above + self.assertEqual( + cc.genconfig(rounds=3001, salt="nacl"), + '$5$rounds=3000$nacl$' + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + # equal + self.assertEqual( + cc.genconfig(rounds=3000, salt="nacl"), + '$5$rounds=3000$nacl$' + ) + self.consumeWarningList(wlog) + + # below + self.assertEqual( + cc.genconfig(rounds=2999, salt="nacl"), + '$5$rounds=2999$nacl$', + ) + self.consumeWarningList(wlog) + + # explicit default rounds + self.assertEqual(cc.genconfig(salt="nacl"), '$5$rounds=2500$nacl$') + + # fallback default rounds - use handler's default + df = hash.sha256_crypt.default_rounds + c2 = cc.copy(all__default_rounds=None, all__max_rounds=df<<1) + self.assertEqual(c2.genconfig(salt="nacl"), + '$5$rounds=%d$nacl$' % df) + + # fallback default rounds - use handler's, but clipped to max rounds + c2 = cc.replace(all__default_rounds=None, all__max_rounds=3000) + self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=3000$nacl$') + + # TODO: test default falls back to mx / mn if handler has no default. + + #default rounds - out of bounds + self.assertRaises(ValueError, cc.replace, all__default_rounds=1999) + cc.policy.replace(all__default_rounds=2000) + cc.policy.replace(all__default_rounds=3000) + self.assertRaises(ValueError, cc.replace, all__default_rounds=3001) + + # invalid min/max bounds + c2 = CryptContext(policy=None, schemes=["sha256_crypt"]) + self.assertRaises(ValueError, c2.replace, all__min_rounds=-1) + self.assertRaises(ValueError, c2.replace, all__max_rounds=-1) + self.assertRaises(ValueError, c2.replace, all__min_rounds=2000, + all__max_rounds=1999) + + def test_10_03_genconfig_linear_vary_rounds(self): + "test genconfig() linear vary rounds" + cc = CryptContext(policy=None, + schemes=["sha256_crypt"], + all__min_rounds=1995, + all__max_rounds=2005, + all__default_rounds=2000, + ) + + # test negative + self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + + # test static + c2 = cc.replace(all__vary_rounds=0) + self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) + + c2 = cc.replace(all__vary_rounds="0%") + self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) + + # test absolute + c2 = cc.replace(all__vary_rounds=1) + self.assert_rounds_range(c2, "sha256_crypt", 1999, 2001) + c2 = cc.replace(all__vary_rounds=100) + self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) + + # test relative + c2 = cc.replace(all__vary_rounds="0.1%") + self.assert_rounds_range(c2, "sha256_crypt", 1998, 2002) + c2 = cc.replace(all__vary_rounds="100%") + self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) + + def test_10_03_genconfig_log2_vary_rounds(self): + "test genconfig() log2 vary rounds" + cc = CryptContext(policy=None, + schemes=["bcrypt"], + all__min_rounds=15, + all__max_rounds=25, + all__default_rounds=20, + ) + + # test negative + self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + + # test static + c2 = cc.replace(all__vary_rounds=0) + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="0%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + # test absolute + c2 = cc.replace(all__vary_rounds=1) + self.assert_rounds_range(c2, "bcrypt", 19, 21) + c2 = cc.replace(all__vary_rounds=100) + self.assert_rounds_range(c2, "bcrypt", 15, 25) + + # test relative - should shift over at 50% mark + c2 = cc.replace(all__vary_rounds="1%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="49%") + self.assert_rounds_range(c2, "bcrypt", 20, 20) + + c2 = cc.replace(all__vary_rounds="50%") + self.assert_rounds_range(c2, "bcrypt", 19, 20) + + c2 = cc.replace(all__vary_rounds="100%") + self.assert_rounds_range(c2, "bcrypt", 15, 21) + + def assert_rounds_range(self, context, scheme, lower, upper): + "helper to check vary_rounds covers specified range" + # NOTE: this runs enough times the min and max *should* be hit, + # though there's a faint chance it will randomly fail. + handler = context.policy.get_handler(scheme) + salt = handler.default_salt_chars[0:1] * handler.max_salt_size + seen = set() + for i in irange(300): + h = context.genconfig(scheme, salt=salt) + r = handler.from_string(h).rounds + seen.add(r) + self.assertEqual(min(seen), lower, "vary_rounds had wrong lower limit:") + self.assertEqual(max(seen), upper, "vary_rounds had wrong upper limit:") + + def test_11_encrypt_settings(self): + "test encrypt() honors policy settings" + cc = CryptContext(**self.sample_policy_1) + + # hash specific settings + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8), + '$H$5........De04R5Egz0aq8Tf.1eVhY/', + ) + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), + '$P$5........De04R5Egz0aq8Tf.1eVhY/', + ) + + # NOTE: more thorough job of rounds limits done in genconfig() test, + # which is much cheaper, and shares the same codebase. + + # min rounds + with catch_warnings(record=True) as wlog: + self.assertEqual( + cc.encrypt("password", rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + self.assertEqual( + cc.encrypt("password", rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' + ) + self.consumeWarningList(wlog) + + # max rounds, etc tested in genconfig() + + # make default > max throws error if attempted + self.assertRaises(ValueError, cc.replace, + sha256_crypt__default_rounds=4000) + + def test_12_hash_needs_update(self): + "test hash_needs_update() method" + cc = CryptContext(**self.sample_policy_1) + + #check deprecated scheme + self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) + self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) + + #check min rounds + self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) + self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) + + #check max rounds + self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) + self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + + #========================================================= + #identify + #========================================================= + def test_20_basic(self): + "test basic encrypt/identify/verify functionality" + handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] + cc = CryptContext(handlers, policy=None) + + #run through handlers + for crypt in handlers: + h = cc.encrypt("test", scheme=crypt.name) + self.assertEqual(cc.identify(h), crypt.name) + self.assertEqual(cc.identify(h, resolve=True), crypt) + self.assertTrue(cc.verify('test', h)) + self.assertTrue(not cc.verify('notest', h)) + + #test default + h = cc.encrypt("test") + self.assertEqual(cc.identify(h), "md5_crypt") + + #test genhash + h = cc.genhash('secret', cc.genconfig()) + self.assertEqual(cc.identify(h), 'md5_crypt') + + h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') + self.assertEqual(cc.identify(h), 'md5_crypt') + + self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") + + def test_21_identify(self): + "test identify() border cases" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers, policy=None) + + #check unknown hash + self.assertEqual(cc.identify('$9$232323123$1287319827'), None) + self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) + + def test_22_verify(self): + "test verify() scheme kwd" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers, policy=None) + + h = hash.md5_crypt.encrypt("test") + + #check base verify + self.assertTrue(cc.verify("test", h)) + self.assertTrue(not cc.verify("notest", h)) + + #check verify using right alg + self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) + self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) + + #check verify using wrong alg + self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') + + def test_24_min_verify_time(self): + "test verify() honors min_verify_time" + #NOTE: this whole test assumes time.sleep() and tick() + # have better than 100ms accuracy - set via delta. + delta = .05 + min_delay = 2*delta + min_verify_time = 5*delta + max_delay = 8*delta + + class TimedHash(uh.StaticHandler): + "psuedo hash that takes specified amount of time" + name = "timed_hash" + delay = 0 + + @classmethod + def identify(cls, hash): + return True + + def _calc_checksum(self, secret): + time.sleep(self.delay) + return to_unicode(secret + 'x') + + # silence deprecation warnings for min verify time + with catch_warnings(record=True) as wlog: + cc = CryptContext([TimedHash], min_verify_time=min_verify_time) + self.consumeWarningList(wlog, DeprecationWarning) + + def timecall(func, *args, **kwds): + start = tick() + result = func(*args, **kwds) + end = tick() + return end-start, result + + #verify genhash delay works + TimedHash.delay = min_delay + elapsed, result = timecall(TimedHash.genhash, 'stub', None) + self.assertEqual(result, 'stubx') + self.assertAlmostEqual(elapsed, min_delay, delta=delta) + + #ensure min verify time is honored + elapsed, result = timecall(cc.verify, "stub", "stubx") + self.assertTrue(result) + self.assertAlmostEqual(elapsed, min_delay, delta=delta) + + elapsed, result = timecall(cc.verify, "blob", "stubx") + self.assertFalse(result) + self.assertAlmostEqual(elapsed, min_verify_time, delta=delta) + + #ensure taking longer emits a warning. + TimedHash.delay = max_delay + with catch_warnings(record=True) as wlog: + elapsed, result = timecall(cc.verify, "blob", "stubx") + self.assertFalse(result) + self.assertAlmostEqual(elapsed, max_delay, delta=delta) + self.consumeWarningList(wlog, ".*verify exceeded min_verify_time") + + def test_25_verify_and_update(self): + "test verify_and_update()" + cc = CryptContext(**self.sample_policy_1) + + #create some hashes + h1 = cc.encrypt("password", scheme="des_crypt") + h2 = cc.encrypt("password", scheme="sha256_crypt") + + #check bad password, deprecated hash + ok, new_hash = cc.verify_and_update("wrongpass", h1) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check bad password, good hash + ok, new_hash = cc.verify_and_update("wrongpass", h2) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check right password, deprecated hash + ok, new_hash = cc.verify_and_update("password", h1) + self.assertTrue(ok) + self.assertTrue(cc.identify(new_hash), "sha256_crypt") + + #check right password, good hash + ok, new_hash = cc.verify_and_update("password", h2) + self.assertTrue(ok) + self.assertIs(new_hash, None) + + #========================================================= + # border cases + #========================================================= + def test_30_nonstring_hash(self): + "test non-string hash values cause error" + # + # test hash=None or some other non-string causes TypeError + # and that explicit-scheme code path behaves the same. + # + cc = CryptContext(["des_crypt"]) + for hash, kwds in [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ]: + + self.assertRaises(TypeError, cc.identify, hash, **kwds) + self.assertRaises(TypeError, cc.genhash, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.verify, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.verify_and_update, 'stub', hash, **kwds) + self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) + + # + # but genhash *should* accept None if default scheme lacks config string. + # + cc2 = CryptContext(["mysql323"]) + self.assertRaises(TypeError, cc2.identify, None) + self.assertIsInstance(cc2.genhash("stub", None), str) + self.assertRaises(TypeError, cc2.verify, 'stub', None) + self.assertRaises(TypeError, cc2.verify_and_update, 'stub', None) + self.assertRaises(TypeError, cc2.hash_needs_update, None) + + + def test_31_nonstring_secret(self): + "test non-string password values cause error" + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt("stub") + # + # test secret=None, or some other non-string causes TypeError + # + for secret, kwds in [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ]: + self.assertRaises(TypeError, cc.encrypt, secret, **kwds) + self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) + self.assertRaises(TypeError, cc.verify, secret, hash, **kwds) + self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) + + #========================================================= + # other + #========================================================= + def test_90_bcrypt_normhash(self): + "teset verify_and_update / hash_needs_update corrects bcrypt padding" + # see issue 25. + bcrypt = hash.bcrypt + + PASS1 = "loppux" + BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + ctx = CryptContext(["bcrypt"]) + + with catch_warnings(record=True) as wlog: + self.assertTrue(ctx.hash_needs_update(BAD1)) + self.assertFalse(ctx.hash_needs_update(GOOD1)) + + if bcrypt.has_backend(): + self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) + self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) + res = ctx.verify_and_update(PASS1, BAD1) + self.assertTrue(res[0] and res[1] and res[1] != BAD1) + + #========================================================= + #eoc + #========================================================= + +#========================================================= +#LazyCryptContext +#========================================================= +class dummy_2(uh.StaticHandler): + name = "dummy_2" + +class LazyCryptContextTest(TestCase): + descriptionPrefix = "LazyCryptContext" + + def setUp(self): + # make sure this isn't registered before OR after + unload_handler_name("dummy_2") + self.addCleanup(unload_handler_name, "dummy_2") + + # silence some warnings + warnings.filterwarnings("ignore", + r"CryptContext\(\)\.replace\(\) has been deprecated") + warnings.filterwarnings("ignore", ".*(CryptPolicy|context\.policy).*(has|have) been deprecated.*") + + def test_kwd_constructor(self): + "test plain kwds" + self.assertFalse(has_crypt_handler("dummy_2")) + register_crypt_handler_path("dummy_2", "passlib.tests.test_context") + + cc = LazyCryptContext(iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) + + self.assertFalse(has_crypt_handler("dummy_2", True)) + + self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) + self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + + self.assertTrue(has_crypt_handler("dummy_2", True)) + + def test_callable_constructor(self): + "test create_policy() hook, returning CryptPolicy" + self.assertFalse(has_crypt_handler("dummy_2")) + register_crypt_handler_path("dummy_2", "passlib.tests.test_context") + + def create_policy(flag=False): + self.assertTrue(flag) + return CryptPolicy(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) + + cc = LazyCryptContext(create_policy=create_policy, flag=True) + + self.assertFalse(has_crypt_handler("dummy_2", True)) + + self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) + self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + + self.assertTrue(has_crypt_handler("dummy_2", True)) + +#========================================================= +#EOF +#========================================================= |