diff options
author | Eli Collins <elic@assurancetechnologies.com> | 2012-04-17 23:14:51 -0400 |
---|---|---|
committer | Eli Collins <elic@assurancetechnologies.com> | 2012-04-17 23:14:51 -0400 |
commit | 64ab6fc89b497efa9169f11d55251e417c4db0ba (patch) | |
tree | b3f6f5dc27b87a6bc90cb3686fa98239ee8ff053 /passlib/tests/test_context.py | |
parent | 8eb4c4d3b58eec6802c698ddbf357b2fd243a68c (diff) | |
parent | cd029846fdc0c3d7ffc7f53caad4579e7e0e8725 (diff) | |
download | passlib-ironpython-support-dev.tar.gz |
Merge from defaultironpython-support-dev
Diffstat (limited to 'passlib/tests/test_context.py')
-rw-r--r-- | passlib/tests/test_context.py | 1737 |
1 files changed, 974 insertions, 763 deletions
diff --git a/passlib/tests/test_context.py b/passlib/tests/test_context.py index d1e4511..d039ac6 100644 --- a/passlib/tests/test_context.py +++ b/passlib/tests/test_context.py @@ -1,9 +1,14 @@ -"""tests for passlib.pwhash -- (c) Assurance Technologies 2003-2009""" +"""tests for passlib.context""" #========================================================= #imports #========================================================= from __future__ import with_statement +from passlib.utils.compat import PY3 #core +if PY3: + from configparser import NoSectionError +else: + from ConfigParser import NoSectionError import hashlib from logging import getLogger import os @@ -17,7 +22,7 @@ except ImportError: resource_filename = None #pkg from passlib import hash -from passlib.context import CryptContext, CryptPolicy, LazyCryptContext +from passlib.context import CryptContext, LazyCryptContext from passlib.exc import PasslibConfigWarning from passlib.utils import tick, to_bytes, to_unicode from passlib.utils.compat import irange, u @@ -25,90 +30,105 @@ import passlib.utils.handlers as uh from passlib.tests.utils import TestCase, mktemp, catch_warnings, \ gae_env, set_file from passlib.registry import register_crypt_handler_path, has_crypt_handler, \ - _unload_handler_name as unload_handler_name + _unload_handler_name as unload_handler_name, get_crypt_handler #module log = getLogger(__name__) +#========================================================= +# support +#========================================================= +here = os.path.abspath(os.path.dirname(__file__)) + +def merge_dicts(first, *args, **kwds): + target = first.copy() + for arg in args: + target.update(arg) + if kwds: + target.update(kwds) + return target #========================================================= # #========================================================= -class CryptPolicyTest(TestCase): - "test CryptPolicy object" - - #TODO: need to test user categories w/in all this +class CryptContextTest(TestCase): + descriptionPrefix = "CryptContext" - descriptionPrefix = "CryptPolicy" + # TODO: these unittests could really use a good cleanup + # and reorganizing, to ensure they're getting everything. #========================================================= - #sample crypt policies used for testing + # sample configurations used in tests #========================================================= #----------------------------------------------------- - #sample 1 - average config file + # sample 1 - typical configuration #----------------------------------------------------- - #NOTE: copy of this is stored in file passlib/tests/sample_config_1s.cfg - sample_config_1s = """\ -[passlib] -schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt -default = md5_crypt -all.vary_rounds = 10%% -bsdi_crypt.max_rounds = 30000 -bsdi_crypt.default_rounds = 25000 -sha512_crypt.max_rounds = 50000 -sha512_crypt.min_rounds = 40000 -""" - sample_config_1s_path = os.path.abspath(os.path.join( - os.path.dirname(__file__), "sample_config_1s.cfg")) - if not os.path.exists(sample_config_1s_path) and resource_filename: - #in case we're zipped up in an egg. - sample_config_1s_path = resource_filename("passlib.tests", - "sample_config_1s.cfg") - - #make sure sample_config_1s uses \n linesep - tests rely on this - assert sample_config_1s.startswith("[passlib]\nschemes") - - sample_config_1pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], + sample_1_schemes = ["des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"] + sample_1_handlers = [get_crypt_handler(name) for name in sample_1_schemes] + + sample_1_dict = dict( + schemes = sample_1_schemes, default = "md5_crypt", - all__vary_rounds = "10%", + all__vary_rounds = 0.1, bsdi_crypt__max_rounds = 30000, bsdi_crypt__default_rounds = 25000, sha512_crypt__max_rounds = 50000, sha512_crypt__min_rounds = 40000, ) - sample_config_1pid = { - "schemes": "des_crypt, md5_crypt, bsdi_crypt, sha512_crypt", - "default": "md5_crypt", - "all.vary_rounds": "10%", - "bsdi_crypt.max_rounds": 30000, - "bsdi_crypt.default_rounds": 25000, - "sha512_crypt.max_rounds": 50000, - "sha512_crypt.min_rounds": 40000, - } - - sample_config_1prd = dict( - schemes = [ hash.des_crypt, hash.md5_crypt, hash.bsdi_crypt, hash.sha512_crypt], - default = "md5_crypt", # NOTE: passlib <= 1.5 was handler obj. - all__vary_rounds = "10%", - bsdi_crypt__max_rounds = 30000, - bsdi_crypt__default_rounds = 25000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds = 40000, - ) + sample_1_resolved_dict = merge_dicts(sample_1_dict, + schemes = sample_1_handlers) + + sample_1_unnormalized = u("""\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +; this is using %... +all__vary_rounds = 10%% +; this is using 'rounds' instead of 'default_rounds' +bsdi_crypt__rounds = 25000 +bsdi_crypt__max_rounds = 30000 +sha512_crypt__max_rounds = 50000 +sha512_crypt__min_rounds = 40000 +""") + + sample_1_unicode = u("""\ +[passlib] +schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt +default = md5_crypt +all__vary_rounds = 0.1 +bsdi_crypt__default_rounds = 25000 +bsdi_crypt__max_rounds = 30000 +sha512_crypt__max_rounds = 50000 +sha512_crypt__min_rounds = 40000 + +""") #----------------------------------------------------- - #sample 2 - partial policy & result of overlay on sample 1 + # sample 1 external files #----------------------------------------------------- - sample_config_2s = """\ -[passlib] -bsdi_crypt.min_rounds = 29000 -bsdi_crypt.max_rounds = 35000 -bsdi_crypt.default_rounds = 31000 -sha512_crypt.min_rounds = 45000 -""" - sample_config_2pd = dict( + # sample 1 string with '\n' linesep + sample_1_path = os.path.join(here, "sample1.cfg") + + # sample 1 with '\r\n' linesep + sample_1b_unicode = sample_1_unicode.replace(u("\n"), u("\r\n")) + sample_1b_path = os.path.join(here, "sample1b.cfg") + + # sample 1 using UTF-16 and alt section + sample_1c_bytes = sample_1_unicode.replace(u("[passlib]"), + u("[mypolicy]")).encode("utf-16") + sample_1c_path = os.path.join(here, "sample1c.cfg") + + # enable to regenerate sample files + if False: + set_file(sample_1_path, sample_1_unicode) + set_file(sample_1b_path, sample_1b_unicode) + set_file(sample_1c_path, sample_1c_bytes) + + #----------------------------------------------------- + # sample 2 & 12 - options patch + #----------------------------------------------------- + sample_2_dict = dict( #using this to test full replacement of existing options bsdi_crypt__min_rounds = 29000, bsdi_crypt__max_rounds = 35000, @@ -117,511 +137,891 @@ sha512_crypt.min_rounds = 45000 sha512_crypt__min_rounds=45000, ) - sample_config_12pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - default = "md5_crypt", - all__vary_rounds = "10%", - bsdi_crypt__min_rounds = 29000, - bsdi_crypt__max_rounds = 35000, - bsdi_crypt__default_rounds = 31000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds=45000, - ) + sample_2_unicode = """\ +[passlib] +bsdi_crypt__min_rounds = 29000 +bsdi_crypt__max_rounds = 35000 +bsdi_crypt__default_rounds = 31000 +sha512_crypt__min_rounds = 45000 +""" + + # sample 2 overlayed on top of sample 1 + sample_12_dict = merge_dicts(sample_1_dict, sample_2_dict) #----------------------------------------------------- - #sample 3 - just changing default + # sample 3 & 123 - just changing default from sample 1 #----------------------------------------------------- - sample_config_3pd = dict( + sample_3_dict = dict( default="sha512_crypt", ) - sample_config_123pd = dict( - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - default = "sha512_crypt", - all__vary_rounds = "10%", - bsdi_crypt__min_rounds = 29000, - bsdi_crypt__max_rounds = 35000, - bsdi_crypt__default_rounds = 31000, - sha512_crypt__max_rounds = 50000, - sha512_crypt__min_rounds=45000, - ) + # sample 3 overlayed on 2 overlayed on 1 + sample_123_dict = merge_dicts(sample_12_dict, sample_3_dict) #----------------------------------------------------- - #sample 4 - category specific + # sample 4 - used by api tests #----------------------------------------------------- - sample_config_4s = """ -[passlib] -schemes = sha512_crypt -all.vary_rounds = 10%% -default.sha512_crypt.max_rounds = 20000 -admin.all.vary_rounds = 5%% -admin.sha512_crypt.max_rounds = 40000 -""" + sample_4_dict = dict( + schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", + "sha256_crypt"], + deprecated = [ "des_crypt", ], + default = "sha256_crypt", + bsdi_crypt__max_rounds = 30, + bsdi_crypt__default_rounds = 25, + bsdi_crypt__vary_rounds = 0, + sha256_crypt__max_rounds = 3000, + sha256_crypt__min_rounds = 2000, + sha256_crypt__default_rounds = 3000, + phpass__ident = "H", + phpass__default_rounds = 7, + ) - sample_config_4pd = dict( - schemes = [ "sha512_crypt" ], - all__vary_rounds = "10%", - sha512_crypt__max_rounds = 20000, - admin__all__vary_rounds = "5%", - admin__sha512_crypt__max_rounds = 40000, - ) + #========================================================= + # constructors + #========================================================= + def test_01_constructor(self): + "test class constructor" - #----------------------------------------------------- - #sample 5 - to_string & deprecation testing - #----------------------------------------------------- - sample_config_5s = sample_config_1s + """\ -deprecated = des_crypt -admin__context__deprecated = des_crypt, bsdi_crypt -""" + # test blank constructor works correctly + ctx = CryptContext() + self.assertEqual(ctx.to_dict(), {}) - sample_config_5pd = sample_config_1pd.copy() - sample_config_5pd.update( - deprecated = [ "des_crypt" ], - admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], - ) + # test sample 1 with scheme=names + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 with scheme=handlers + ctx = CryptContext(**self.sample_1_resolved_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 2: options w/o schemes + ctx = CryptContext(**self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_2_dict) - sample_config_5pid = sample_config_1pid.copy() - sample_config_5pid.update({ - "deprecated": "des_crypt", - "admin.context.deprecated": "des_crypt, bsdi_crypt", - }) + # test sample 3: default only + ctx = CryptContext(**self.sample_3_dict) + self.assertEqual(ctx.to_dict(), self.sample_3_dict) - sample_config_5prd = sample_config_1prd.copy() - sample_config_5prd.update({ - # XXX: should deprecated return the actual handlers in this case? - # would have to modify how policy stores info, for one. - "deprecated": ["des_crypt"], - "admin__context__deprecated": ["des_crypt", "bsdi_crypt"], - }) + def test_02_from_string(self): + "test from_string() constructor" + # test sample 1 unicode + ctx = CryptContext.from_string(self.sample_1_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 with unnormalized inputs + ctx = CryptContext.from_string(self.sample_1_unnormalized) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 utf-8 + ctx = CryptContext.from_string(self.sample_1_unicode.encode("utf-8")) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 w/ '\r\n' linesep + ctx = CryptContext.from_string(self.sample_1b_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 using UTF-16 and alt section + ctx = CryptContext.from_string(self.sample_1c_bytes, section="mypolicy", + encoding="utf-16") + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test wrong type + self.assertRaises(TypeError, CryptContext.from_string, None) + + # test missing section + self.assertRaises(NoSectionError, CryptContext.from_string, + self.sample_1_unicode, section="fakesection") + + def test_03_from_path(self): + "test from_path() constructor" + # make sure sample files exist + if not os.path.exists(self.sample_1_path): + raise RuntimeError("can't find data file: %r" % self.sample_1_path) + + # test sample 1 + ctx = CryptContext.from_path(self.sample_1_path) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 w/ '\r\n' linesep + ctx = CryptContext.from_path(self.sample_1b_path) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test sample 1 encoding using UTF-16 and alt section + ctx = CryptContext.from_path(self.sample_1c_path, section="mypolicy", + encoding="utf-16") + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # test missing file + self.assertRaises(EnvironmentError, CryptContext.from_path, + os.path.join(here, "sample1xxx.cfg")) + + # test missing section + self.assertRaises(NoSectionError, CryptContext.from_path, + self.sample_1_path, section="fakesection") + + def test_04_copy(self): + "test copy() method" + cc1 = CryptContext(**self.sample_1_dict) + + # overlay sample 2 onto copy + cc2 = cc1.copy(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc2.to_dict(), self.sample_12_dict) + + # check that repeating overlay makes no change + cc2b = cc2.copy(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc2b.to_dict(), self.sample_12_dict) + + # overlay sample 3 on copy + cc3 = cc2.copy(**self.sample_3_dict) + self.assertEqual(cc3.to_dict(), self.sample_123_dict) + + # test empty copy creates separate copy + cc4 = cc1.copy() + self.assertIsNot(cc4, cc1) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc4.to_dict(), self.sample_1_dict) + + # ... and that modifying copy doesn't affect original + cc4.update(**self.sample_2_dict) + self.assertEqual(cc1.to_dict(), self.sample_1_dict) + self.assertEqual(cc4.to_dict(), self.sample_12_dict) #========================================================= - #constructors + # modifiers #========================================================= - def test_00_constructor(self): - "test CryptPolicy() constructor" - policy = CryptPolicy(**self.sample_config_1pd) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #check key with too many separators is rejected - self.assertRaises(TypeError, CryptPolicy, - schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"], - bad__key__bsdi_crypt__max_rounds = 30000, + def test_10_load(self): + "test load() / load_path() method" + # NOTE: load() is the workhorse that handles all policy parsing, + # compilation, and validation. most of it's features are tested + # elsewhere, since all the constructors and modifiers are just + # wrappers for it. + + # source_type 'auto' + ctx = CryptContext() + + # detect dict + ctx.load(self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # detect unicode string + ctx.load(self.sample_1_unicode) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # detect bytes string + ctx.load(self.sample_1_unicode.encode("utf-8")) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + + # anything else - TypeError + self.assertRaises(TypeError, ctx.load, None) + + # NOTE: load_path() tested by from_path() + # NOTE: additional string tests done by from_string() + + # update flag - tested by update() method tests + # encoding keyword - tested by from_string() & from_path() + # section keyword - tested by from_string() & from_path() + + # multiple loads should clear the state + ctx = CryptContext() + ctx.load(self.sample_1_dict) + ctx.load(self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_2_dict) + + def test_11_load_rollback(self): + "test load() errors restore old state" + # create initial context + cc = CryptContext(["des_crypt", "sha256_crypt"], + sha256_crypt__default_rounds=5000, + all__vary_rounds=0.1, ) + result = cc.to_string() - #check nameless handler rejected - class nameless(uh.StaticHandler): - name = None - self.assertRaises(ValueError, CryptPolicy, schemes=[nameless]) + # do an update operation that should fail during parsing + # XXX: not sure what the right error type is here. + self.assertRaises(TypeError, cc.update, too__many__key__parts=True) + self.assertEqual(cc.to_string(), result) - # check scheme must be name or crypt handler - self.assertRaises(TypeError, CryptPolicy, schemes=[uh.StaticHandler]) + # do an update operation that should fail during extraction + # FIXME: this isn't failing even in broken case, need to figure out + # way to ensure some keys come after this one. + self.assertRaises(KeyError, cc.update, fake_context_option=True) + self.assertEqual(cc.to_string(), result) - #check name conflicts are rejected - class dummy_1(uh.StaticHandler): - name = 'dummy_1' - self.assertRaises(KeyError, CryptPolicy, schemes=[dummy_1, dummy_1]) + # do an update operation that should fail during compilation + self.assertRaises(ValueError, cc.update, sha256_crypt__min_rounds=10000) + self.assertEqual(cc.to_string(), result) - #with unknown deprecated value - self.assertRaises(KeyError, CryptPolicy, - schemes=['des_crypt'], - deprecated=['md5_crypt']) + def test_12_update(self): + "test update() method" - #with unknown default value - self.assertRaises(KeyError, CryptPolicy, - schemes=['des_crypt'], - default='md5_crypt') + # empty overlay + ctx = CryptContext(**self.sample_1_dict) + ctx.update() + self.assertEqual(ctx.to_dict(), self.sample_1_dict) - def test_01_from_path_simple(self): - "test CryptPolicy.from_path() constructor" - #NOTE: this is separate so it can also run under GAE + # test basic overlay + ctx = CryptContext(**self.sample_1_dict) + ctx.update(**self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - #test preset stored in existing file - path = self.sample_config_1s_path - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # ... and again + ctx.update(**self.sample_3_dict) + self.assertEqual(ctx.to_dict(), self.sample_123_dict) - #test if path missing - self.assertRaises(EnvironmentError, CryptPolicy.from_path, path + 'xxx') + # overlay w/ dict arg + ctx = CryptContext(**self.sample_1_dict) + ctx.update(self.sample_2_dict) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - def test_01_from_path(self): - "test CryptPolicy.from_path() constructor with encodings" - if gae_env: - return self.skipTest("GAE doesn't offer read/write filesystem access") + # overlay w/ string + ctx = CryptContext(**self.sample_1_dict) + ctx.update(self.sample_2_unicode) + self.assertEqual(ctx.to_dict(), self.sample_12_dict) - path = mktemp() + # too many args + self.assertRaises(TypeError, ctx.update, {}, {}) + self.assertRaises(TypeError, ctx.update, {}, schemes=['des_crypt']) - #test "\n" linesep - set_file(path, self.sample_config_1s) - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # wrong arg type + self.assertRaises(TypeError, ctx.update, None) - #test "\r\n" linesep - set_file(path, self.sample_config_1s.replace("\n","\r\n")) - policy = CryptPolicy.from_path(path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + #========================================================= + # option parsing + #========================================================= + def test_20_options(self): + "test basic option parsing" + def parse(**kwds): + return CryptContext(**kwds).to_dict() - #test with custom encoding - uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") - set_file(path, uc2) - policy = CryptPolicy.from_path(path, encoding="utf-16") - self.assertEqual(policy.to_dict(), self.sample_config_1pd) + # + # common option parsing tests + # - def test_02_from_string(self): - "test CryptPolicy.from_string() constructor" - #test "\n" linesep - policy = CryptPolicy.from_string(self.sample_config_1s) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test "\r\n" linesep - policy = CryptPolicy.from_string( - self.sample_config_1s.replace("\n","\r\n")) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test with unicode - data = to_unicode(self.sample_config_1s) - policy = CryptPolicy.from_string(data) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test with non-ascii-compatible encoding - uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8") - policy = CryptPolicy.from_string(uc2, encoding="utf-16") - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #test category specific options - policy = CryptPolicy.from_string(self.sample_config_4s) - self.assertEqual(policy.to_dict(), self.sample_config_4pd) - - def test_03_from_source(self): - "test CryptPolicy.from_source() constructor" - #pass it a path - policy = CryptPolicy.from_source(self.sample_config_1s_path) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it a string - policy = CryptPolicy.from_source(self.sample_config_1s) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it a dict (NOTE: make a copy to detect in-place modifications) - policy = CryptPolicy.from_source(self.sample_config_1pd.copy()) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass it existing policy - p2 = CryptPolicy.from_source(policy) - self.assertIs(policy, p2) - - #pass it something wrong - self.assertRaises(TypeError, CryptPolicy.from_source, 1) - self.assertRaises(TypeError, CryptPolicy.from_source, []) - - def test_04_from_sources(self): - "test CryptPolicy.from_sources() constructor" - - #pass it empty list - self.assertRaises(ValueError, CryptPolicy.from_sources, []) - - #pass it one-element list - policy = CryptPolicy.from_sources([self.sample_config_1s]) - self.assertEqual(policy.to_dict(), self.sample_config_1pd) - - #pass multiple sources - policy = CryptPolicy.from_sources( - [ - self.sample_config_1s_path, - self.sample_config_2s, - self.sample_config_3pd, - ]) - self.assertEqual(policy.to_dict(), self.sample_config_123pd) - - def test_05_replace(self): - "test CryptPolicy.replace() constructor" - - p1 = CryptPolicy(**self.sample_config_1pd) - - #check overlaying sample 2 - p2 = p1.replace(**self.sample_config_2pd) - self.assertEqual(p2.to_dict(), self.sample_config_12pd) - - #check repeating overlay makes no change - p2b = p2.replace(**self.sample_config_2pd) - self.assertEqual(p2b.to_dict(), self.sample_config_12pd) - - #check overlaying sample 3 - p3 = p2.replace(self.sample_config_3pd) - self.assertEqual(p3.to_dict(), self.sample_config_123pd) - - def test_06_forbidden(self): - "test CryptPolicy() forbidden kwds" - - #salt not allowed to be set - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - des_crypt__salt="xx", - ) - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - all__salt="xx", - ) + # test key with blank separators is rejected + self.assertRaises(TypeError, CryptContext, __=0.1) + self.assertRaises(TypeError, CryptContext, __default='x') + self.assertRaises(TypeError, CryptContext, default____default='x') + self.assertRaises(TypeError, CryptContext, __default____default='x') - #schemes not allowed for category - self.assertRaises(KeyError, CryptPolicy, - schemes=["des_crypt"], - user__context__schemes=["md5_crypt"], - ) + # test key with too many separators is rejected + self.assertRaises(TypeError, CryptContext, + category__scheme__option__invalid = 30000) - #========================================================= - #reading - #========================================================= - def test_10_has_schemes(self): - "test has_schemes() method" + # + # context option -specific tests + # - p1 = CryptPolicy(**self.sample_config_1pd) - self.assertTrue(p1.has_schemes()) + # test context option key parsing + result = dict(default="md5_crypt") + self.assertEqual(parse(default="md5_crypt"), result) + self.assertEqual(parse(context__default="md5_crypt"), result) + self.assertEqual(parse(default__context__default="md5_crypt"), result) + self.assertEqual(parse(**{"context.default":"md5_crypt"}), result) + self.assertEqual(parse(**{"default.context.default":"md5_crypt"}), result) - p3 = CryptPolicy(**self.sample_config_3pd) - self.assertTrue(not p3.has_schemes()) + # test context option key parsing w/ category + result = dict(admin__context__default="md5_crypt") + self.assertEqual(parse(admin__context__default="md5_crypt"), result) + self.assertEqual(parse(**{"admin.context.default":"md5_crypt"}), result) - def test_11_iter_handlers(self): - "test iter_handlers() method" + # + # hash option -specific tests + # - p1 = CryptPolicy(**self.sample_config_1pd) - s = self.sample_config_1prd['schemes'] - self.assertEqual(list(p1.iter_handlers()), s) + # test hash option key parsing + result = dict(all__vary_rounds=0.1) + self.assertEqual(parse(all__vary_rounds=0.1), result) + self.assertEqual(parse(default__all__vary_rounds=0.1), result) + self.assertEqual(parse(**{"all.vary_rounds":0.1}), result) + self.assertEqual(parse(**{"default.all.vary_rounds":0.1}), result) - p3 = CryptPolicy(**self.sample_config_3pd) - self.assertEqual(list(p3.iter_handlers()), []) + # test hash option key parsing w/ category + result = dict(admin__all__vary_rounds=0.1) + self.assertEqual(parse(admin__all__vary_rounds=0.1), result) + self.assertEqual(parse(**{"admin.all.vary_rounds":0.1}), result) - def test_12_get_handler(self): - "test get_handler() method" + # settings not allowed if not in hash.settings_kwds + ctx = CryptContext(["phpass", "md5_crypt"], phpass__ident="P") + self.assertRaises(KeyError, ctx.copy, md5_crypt__ident="P") - p1 = CryptPolicy(**self.sample_config_1pd) + # hash options 'salt' and 'rounds' not allowed + self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"], + des_crypt__salt="xx") + self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"], + all__salt="xx") - #check by name - self.assertIs(p1.get_handler("bsdi_crypt"), hash.bsdi_crypt) + def test_21_schemes(self): + "test 'schemes' context option parsing" - #check by missing name - self.assertIs(p1.get_handler("sha256_crypt"), None) - self.assertRaises(KeyError, p1.get_handler, "sha256_crypt", required=True) + # schemes can be empty + cc = CryptContext(schemes=None) + self.assertEqual(cc.schemes(), ()) - #check default - self.assertIs(p1.get_handler(), hash.md5_crypt) + # schemes can be list of names + cc = CryptContext(schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - def test_13_get_options(self): - "test get_options() method" + # schemes can be comma-sep string + cc = CryptContext(schemes=" des_crypt, md5_crypt, ") + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - p12 = CryptPolicy(**self.sample_config_12pd) + # schemes can be list of handlers + cc = CryptContext(schemes=[hash.des_crypt, hash.md5_crypt]) + self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt")) - self.assertEqual(p12.get_options("bsdi_crypt"),dict( - vary_rounds = "10%", - min_rounds = 29000, - max_rounds = 35000, - default_rounds = 31000, - )) + # scheme must be name or handler + self.assertRaises(TypeError, CryptContext, schemes=[uh.StaticHandler]) - self.assertEqual(p12.get_options("sha512_crypt"),dict( - vary_rounds = "10%", - min_rounds = 45000, - max_rounds = 50000, - )) + # handlers must have a name + class nameless(uh.StaticHandler): + name = None + self.assertRaises(ValueError, CryptContext, schemes=[nameless]) + + # names must be unique + class dummy_1(uh.StaticHandler): + name = 'dummy_1' + self.assertRaises(KeyError, CryptContext, schemes=[dummy_1, dummy_1]) + + # schemes not allowed per-category + self.assertRaises(KeyError, CryptContext, + admin__context__schemes=["md5_crypt"]) + + def test_22_deprecated(self): + "test 'deprecated' context option parsing" + def getdep(ctx, category=None): + return [name for name in ctx.schemes() + if ctx._is_deprecated_scheme(name, category)] + + # no schemes - all deprecated values allowed + cc = CryptContext(deprecated=["md5_crypt"]) + cc.update(schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc),["md5_crypt"]) + + # deprecated values allowed if subset of schemes + cc = CryptContext(deprecated=["md5_crypt"], schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc), ["md5_crypt"]) + + # can be handler + # XXX: allow handlers in deprecated list? not for now. + self.assertRaises(TypeError, CryptContext, deprecated=[hash.md5_crypt], + schemes=["md5_crypt", "des_crypt"]) +## cc = CryptContext(deprecated=[hash.md5_crypt], schemes=["md5_crypt", "des_crypt"]) +## self.assertEqual(getdep(cc), ["md5_crypt"]) + + # comma sep list + cc = CryptContext(deprecated="md5_crypt,des_crypt", schemes=["md5_crypt", "des_crypt"]) + self.assertEqual(getdep(cc), ["md5_crypt", "des_crypt"]) + + # values outside of schemes not allowed + self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'], + deprecated=['md5_crypt']) + + # wrong type + self.assertRaises(TypeError, CryptContext, deprecated=123) + + # deprecated per-category + cc = CryptContext(deprecated=["md5_crypt"], + schemes=["md5_crypt", "des_crypt"], + admin__context__deprecated=["des_crypt"], + ) + self.assertEqual(getdep(cc), ["md5_crypt"]) + self.assertEqual(getdep(cc, "user"), ["md5_crypt"]) + self.assertEqual(getdep(cc, "admin"), ["des_crypt"]) + + def test_23_default(self): + "test 'default' context option parsing" + + # anything allowed if no schemes + self.assertEqual(CryptContext(default="md5_crypt").to_dict(), + dict(default="md5_crypt")) + + # default allowed if in scheme list + ctx = CryptContext(default="md5_crypt", schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(ctx.default_scheme(), "md5_crypt") - p4 = CryptPolicy.from_string(self.sample_config_4s) - self.assertEqual(p4.get_options("sha512_crypt"), dict( - vary_rounds="10%", + # default can be handler + # XXX: sure we want to allow this ? maybe deprecate in future. + ctx = CryptContext(default=hash.md5_crypt, schemes=["des_crypt", "md5_crypt"]) + self.assertEqual(ctx.default_scheme(), "md5_crypt") + + # error if not in scheme list + self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'], + default='md5_crypt') + + # wrong type + self.assertRaises(TypeError, CryptContext, default=1) + + # per-category + ctx = CryptContext(default="des_crypt", + schemes=["des_crypt", "md5_crypt"], + admin__context__default="md5_crypt") + self.assertEqual(ctx.default_scheme(), "des_crypt") + self.assertEqual(ctx.default_scheme("user"), "des_crypt") + self.assertEqual(ctx.default_scheme("admin"), "md5_crypt") + + def test_24_vary_rounds(self): + "test 'vary_rounds' hash option parsing" + def parse(v): + return CryptContext(all__vary_rounds=v).to_dict()['all__vary_rounds'] + + # floats should be preserved + self.assertEqual(parse(0.1), 0.1) + self.assertEqual(parse('0.1'), 0.1) + + # 'xx%' should be converted to float + self.assertEqual(parse('10%'), 0.1) + + # ints should be preserved + self.assertEqual(parse(1000), 1000) + self.assertEqual(parse('1000'), 1000) + + #========================================================= + # inspection & serialization + #========================================================= + def test_30_schemes(self): + "test schemes() method" + # NOTE: also checked under test_21 + + # test empty + ctx = CryptContext() + self.assertEqual(ctx.schemes(), ()) + self.assertEqual(ctx.schemes(resolve=True), ()) + + # test sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.schemes(), tuple(self.sample_1_schemes)) + self.assertEqual(ctx.schemes(resolve=True), tuple(self.sample_1_handlers)) + + # test sample 2 + ctx = CryptContext(**self.sample_2_dict) + self.assertEqual(ctx.schemes(), ()) + + def test_31_default_scheme(self): + "test default_scheme() method" + # NOTE: also checked under test_23 + + # test empty + ctx = CryptContext() + self.assertRaises(KeyError, ctx.default_scheme) + + # test sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.default_scheme(), "md5_crypt") + self.assertEqual(ctx.default_scheme(resolve=True), hash.md5_crypt) + + # test sample 2 + ctx = CryptContext(**self.sample_2_dict) + self.assertRaises(KeyError, ctx.default_scheme) + + # test defaults to first in scheme + ctx = CryptContext(schemes=self.sample_1_schemes) + self.assertEqual(ctx.default_scheme(), "des_crypt") + + # categories tested under test_23 + + def test_32_handler(self): + "test handler() method" + + # default for empty + ctx = CryptContext() + self.assertRaises(KeyError, ctx.handler) + + # default for sample 1 + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.handler(), hash.md5_crypt) + + # by name + self.assertEqual(ctx.handler("des_crypt"), hash.des_crypt) + + # name not in schemes + self.assertRaises(KeyError, ctx.handler, "mysql323") + + # TODO: per-category + + def test_33_options(self): + "test internal _get_record_options() method" + def options(ctx, scheme, category=None): + return ctx._get_record_options(scheme, category)[0] + + # this checks that (3 schemes, 3 categories) inherit options correctly. + # the 'user' category is not present in the options. + cc4 = CryptContext( + schemes = [ "sha512_crypt", "des_crypt", "bsdi_crypt"], + deprecated = ["sha512_crypt", "des_crypt"], + all__vary_rounds = 0.1, + bsdi_crypt__vary_rounds=0.2, + sha512_crypt__max_rounds = 20000, + admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ], + admin__all__vary_rounds = 0.05, + admin__bsdi_crypt__vary_rounds=0.3, + admin__sha512_crypt__max_rounds = 40000, + ) + self.assertEqual(cc4._categories, ("admin",)) + + # + # sha512_crypt + # + self.assertEqual(options(cc4, "sha512_crypt"), dict( + deprecated=True, + vary_rounds=0.1, # inherited from all__ max_rounds=20000, )) - self.assertEqual(p4.get_options("sha512_crypt", "user"), dict( - vary_rounds="10%", + self.assertEqual(options(cc4, "sha512_crypt", "user"), dict( + deprecated=True, # unconfigured category inherits from default + vary_rounds=0.1, max_rounds=20000, )) - self.assertEqual(p4.get_options("sha512_crypt", "admin"), dict( - vary_rounds="5%", - max_rounds=40000, + self.assertEqual(options(cc4, "sha512_crypt", "admin"), dict( + # NOT deprecated - context option overridden per-category + vary_rounds=0.05, # global overridden per-cateogry + max_rounds=40000, # overridden per-category )) - def test_14_handler_is_deprecated(self): - "test handler_is_deprecated() method" - pa = CryptPolicy(**self.sample_config_1pd) - pb = CryptPolicy(**self.sample_config_5pd) - - self.assertFalse(pa.handler_is_deprecated("des_crypt")) - self.assertFalse(pa.handler_is_deprecated(hash.bsdi_crypt)) - self.assertFalse(pa.handler_is_deprecated("sha512_crypt")) - - self.assertTrue(pb.handler_is_deprecated("des_crypt")) - self.assertFalse(pb.handler_is_deprecated(hash.bsdi_crypt)) - self.assertFalse(pb.handler_is_deprecated("sha512_crypt")) - - #check categories as well - self.assertTrue(pb.handler_is_deprecated("des_crypt", "user")) - self.assertFalse(pb.handler_is_deprecated("bsdi_crypt", "user")) - self.assertTrue(pb.handler_is_deprecated("des_crypt", "admin")) - self.assertTrue(pb.handler_is_deprecated("bsdi_crypt", "admin")) - - # check deprecation is overridden per category - pc = CryptPolicy( - schemes=["md5_crypt", "des_crypt"], - deprecated=["md5_crypt"], - user__context__deprecated=["des_crypt"], - ) - self.assertTrue(pc.handler_is_deprecated("md5_crypt")) - self.assertFalse(pc.handler_is_deprecated("des_crypt")) - self.assertFalse(pc.handler_is_deprecated("md5_crypt", "user")) - self.assertTrue(pc.handler_is_deprecated("des_crypt", "user")) + # + # des_crypt + # + self.assertEqual(options(cc4, "des_crypt"), dict( + deprecated=True, + vary_rounds=0.1, + )) - def test_15_min_verify_time(self): - "test get_min_verify_time() method" - # silence deprecation warnings for min verify time - warnings.filterwarnings("ignore", category=DeprecationWarning) + self.assertEqual(options(cc4, "des_crypt", "user"), dict( + deprecated=True, # unconfigured category inherits from default + vary_rounds=0.1, + )) - pa = CryptPolicy() - self.assertEqual(pa.get_min_verify_time(), 0) - self.assertEqual(pa.get_min_verify_time('admin'), 0) + self.assertEqual(options(cc4, "des_crypt", "admin"), dict( + deprecated=True, # unchanged though overidden + vary_rounds=0.05, # global overridden per-cateogry + )) - pb = pa.replace(min_verify_time=.1) - self.assertEqual(pb.get_min_verify_time(), .1) - self.assertEqual(pb.get_min_verify_time('admin'), .1) + # + # bsdi_crypt + # + self.assertEqual(options(cc4, "bsdi_crypt"), dict( + vary_rounds=0.2, # overridden from all__vary_rounds + )) - pc = pa.replace(admin__context__min_verify_time=.2) - self.assertEqual(pc.get_min_verify_time(), 0) - self.assertEqual(pc.get_min_verify_time('admin'), .2) + self.assertEqual(options(cc4, "bsdi_crypt", "user"), dict( + vary_rounds=0.2, # unconfigured category inherits from default + )) - pd = pb.replace(admin__context__min_verify_time=.2) - self.assertEqual(pd.get_min_verify_time(), .1) - self.assertEqual(pd.get_min_verify_time('admin'), .2) + self.assertEqual(options(cc4, "bsdi_crypt", "admin"), dict( + vary_rounds=0.3, + deprecated=True, # deprecation set per-category + )) - #========================================================= - #serialization - #========================================================= - def test_20_iter_config(self): - "test iter_config() method" - p5 = CryptPolicy(**self.sample_config_5pd) - self.assertEqual(dict(p5.iter_config()), self.sample_config_5pd) - self.assertEqual(dict(p5.iter_config(resolve=True)), self.sample_config_5prd) - self.assertEqual(dict(p5.iter_config(ini=True)), self.sample_config_5pid) - - def test_21_to_dict(self): + def test_34_to_dict(self): "test to_dict() method" - p5 = CryptPolicy(**self.sample_config_5pd) - self.assertEqual(p5.to_dict(), self.sample_config_5pd) - self.assertEqual(p5.to_dict(resolve=True), self.sample_config_5prd) + # NOTE: this is tested all throughout this test case. + ctx = CryptContext(**self.sample_1_dict) + self.assertEqual(ctx.to_dict(), self.sample_1_dict) + self.assertEqual(ctx.to_dict(resolve=True), self.sample_1_resolved_dict) - def test_22_to_string(self): + def test_35_to_string(self): "test to_string() method" - pa = CryptPolicy(**self.sample_config_5pd) - s = pa.to_string() #NOTE: can't compare string directly, ordering etc may not match - pb = CryptPolicy.from_string(s) - self.assertEqual(pb.to_dict(), self.sample_config_5pd) - #========================================================= - # - #========================================================= + # create ctx and serialize + ctx = CryptContext(**self.sample_1_dict) + dump = ctx.to_string() -#========================================================= -#CryptContext -#========================================================= -class CryptContextTest(TestCase): - "test CryptContext class" - descriptionPrefix = "CryptContext" + # check ctx->string returns canonical format. + # NOTE: ConfigParser for PY26 and earlier didn't use OrderedDict, + # so to_string() won't get order correct. + # so we skip this test. + import sys + if sys.version_info >= (2,7): + self.assertEqual(dump, self.sample_1_unicode) + + # check ctx->string->ctx->dict returns original + ctx2 = CryptContext.from_string(dump) + self.assertEqual(ctx2.to_dict(), self.sample_1_dict) + + # TODO: test other features, like the unmanaged handler warning. + # TODO: test compact mode, section #========================================================= - #constructor + # password hash api #========================================================= - def test_00_constructor(self): - "test constructor" - #create crypt context using handlers - cc = CryptContext([hash.md5_crypt, hash.bsdi_crypt, hash.des_crypt]) - c,b,a = cc.policy.iter_handlers() - self.assertIs(a, hash.des_crypt) - self.assertIs(b, hash.bsdi_crypt) - self.assertIs(c, hash.md5_crypt) - - #create context using names - cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) - c,b,a = cc.policy.iter_handlers() - self.assertIs(a, hash.des_crypt) - self.assertIs(b, hash.bsdi_crypt) - self.assertIs(c, hash.md5_crypt) - - #TODO: test policy & other options - - def test_01_replace(self): - "test replace()" - - cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"]) - self.assertIs(cc.policy.get_handler(), hash.md5_crypt) - - cc2 = cc.replace() - self.assertIsNot(cc2, cc) - self.assertIs(cc2.policy, cc.policy) - - cc3 = cc.replace(default="bsdi_crypt") - self.assertIsNot(cc3, cc) - self.assertIsNot(cc3.policy, cc.policy) - self.assertIs(cc3.policy.get_handler(), hash.bsdi_crypt) - - def test_02_no_handlers(self): - "test no handlers" - - #check constructor... - cc = CryptContext() - self.assertRaises(KeyError, cc.identify, 'hash', required=True) - self.assertRaises(KeyError, cc.encrypt, 'secret') - self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + nonstring_vectors = [ + (None, {}), + (None, {"scheme": "des_crypt"}), + (1, {}), + ((), {}), + ] + + def test_40_basic(self): + "test basic encrypt/identify/verify functionality" + handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] + cc = CryptContext(handlers) + + #run through handlers + for crypt in handlers: + h = cc.encrypt("test", scheme=crypt.name) + self.assertEqual(cc.identify(h), crypt.name) + self.assertEqual(cc.identify(h, resolve=True), crypt) + self.assertTrue(cc.verify('test', h)) + self.assertTrue(not cc.verify('notest', h)) - #check updating policy after the fact... - cc = CryptContext(['md5_crypt']) - p = CryptPolicy(schemes=[]) - cc.policy = p + #test default + h = cc.encrypt("test") + self.assertEqual(cc.identify(h), "md5_crypt") - self.assertRaises(KeyError, cc.identify, 'hash', required=True) - self.assertRaises(KeyError, cc.encrypt, 'secret') - self.assertRaises(KeyError, cc.verify, 'secret', 'hash') + #test genhash + h = cc.genhash('secret', cc.genconfig()) + self.assertEqual(cc.identify(h), 'md5_crypt') - #========================================================= - #policy adaptation - #========================================================= - sample_policy_1 = dict( - schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt", - "sha256_crypt"], - deprecated = [ "des_crypt", ], - default = "sha256_crypt", - bsdi_crypt__max_rounds = 30, - bsdi_crypt__default_rounds = 25, - bsdi_crypt__vary_rounds = 0, - sha256_crypt__max_rounds = 3000, - sha256_crypt__min_rounds = 2000, - sha256_crypt__default_rounds = 3000, - phpass__ident = "H", - phpass__default_rounds = 7, - ) + h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') + self.assertEqual(cc.identify(h), 'md5_crypt') - def test_10_01_genconfig_settings(self): - "test genconfig() settings" - cc = CryptContext(policy=None, - schemes=["md5_crypt", "phpass"], + self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") + + def test_41_genconfig(self): + "test genconfig() method" + cc = CryptContext(schemes=["md5_crypt", "phpass"], phpass__ident="H", phpass__default_rounds=7, ) - # hash specific settings + # uses default scheme self.assertTrue(cc.genconfig().startswith("$1$")) - self.assertEqual( - cc.genconfig(scheme="phpass", salt='.'*8), - '$H$5........', - ) + + # override scheme + self.assertTrue(cc.genconfig(scheme="phpass").startswith("$H$5")) + + # override scheme & custom settings self.assertEqual( cc.genconfig(scheme="phpass", salt='.'*8, rounds=8, ident='P'), '$P$6........', ) - # unsupported hash settings should be rejected - self.assertRaises(KeyError, cc.replace, md5_crypt__ident="P") + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().genconfig) + + def test_42_genhash(self): + "test genhash() method" + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.genhash, 'secret', hash, **kwds) + + # .. but should accept None if default scheme lacks config string + cc = CryptContext(["mysql323"]) + self.assertIsInstance(cc.genhash("stub", None), str) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().genhash, 'secret', 'hash') + + def test_43_encrypt(self): + "test encrypt() method" + cc = CryptContext(**self.sample_4_dict) + + # hash specific settings + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8), + '$H$5........De04R5Egz0aq8Tf.1eVhY/', + ) + self.assertEqual( + cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), + '$P$5........De04R5Egz0aq8Tf.1eVhY/', + ) + + # NOTE: more thorough job of rounds limits done below. - def test_10_02_genconfig_rounds_limits(self): - "test genconfig() policy rounds limits" - cc = CryptContext(policy=None, - schemes=["sha256_crypt"], + # min rounds + with catch_warnings(record=True) as wlog: + self.assertEqual( + cc.encrypt("password", rounds=1999, salt="nacl"), + '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', + ) + self.consumeWarningList(wlog, PasslibConfigWarning) + + self.assertEqual( + cc.encrypt("password", rounds=2001, salt="nacl"), + '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' + ) + self.consumeWarningList(wlog) + + # NOTE: max rounds, etc tested in genconfig() + + # make default > max throws error if attempted + self.assertRaises(ValueError, cc.copy, + sha256_crypt__default_rounds=4000) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.encrypt, secret, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().encrypt, 'secret') + + def test_44_identify(self): + "test identify() border cases" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers) + + #check unknown hash + self.assertEqual(cc.identify('$9$232323123$1287319827'), None) + self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.identify, hash, **kwds) + + # throws error without schemes + cc = CryptContext() + self.assertIs(cc.identify('hash'), None) + self.assertRaises(KeyError, cc.identify, 'hash', required=True) + + def test_45_verify(self): + "test verify() scheme kwd" + handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] + cc = CryptContext(handlers) + + h = hash.md5_crypt.encrypt("test") + + #check base verify + self.assertTrue(cc.verify("test", h)) + self.assertTrue(not cc.verify("notest", h)) + + #check verify using right alg + self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) + self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) + + #check verify using wrong alg + self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + h = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify, secret, h, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for h, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify, 'secret', h, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().verify, 'secret', 'hash') + + def test_46_hash_needs_update(self): + "test hash_needs_update() method" + cc = CryptContext(**self.sample_4_dict) + + #check deprecated scheme + self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) + self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) + + #check min rounds + self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) + self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) + + #check max rounds + self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) + self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().hash_needs_update, 'hash') + + def test_47_verify_and_update(self): + "test verify_and_update()" + cc = CryptContext(**self.sample_4_dict) + + #create some hashes + h1 = cc.encrypt("password", scheme="des_crypt") + h2 = cc.encrypt("password", scheme="sha256_crypt") + + #check bad password, deprecated hash + ok, new_hash = cc.verify_and_update("wrongpass", h1) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check bad password, good hash + ok, new_hash = cc.verify_and_update("wrongpass", h2) + self.assertFalse(ok) + self.assertIs(new_hash, None) + + #check right password, deprecated hash + ok, new_hash = cc.verify_and_update("password", h1) + self.assertTrue(ok) + self.assertTrue(cc.identify(new_hash), "sha256_crypt") + + #check right password, good hash + ok, new_hash = cc.verify_and_update("password", h2) + self.assertTrue(ok) + self.assertIs(new_hash, None) + + #-------------------------------------------------------------- + # border cases + #-------------------------------------------------------------- + + # rejects non-string secrets + cc = CryptContext(["des_crypt"]) + hash = cc.encrypt('stub') + for secret, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) + + # rejects non-string hashes + cc = CryptContext(["des_crypt"]) + for hash, kwds in self.nonstring_vectors: + self.assertRaises(TypeError, cc.verify_and_update, 'secret', hash, **kwds) + + # throws error without schemes + self.assertRaises(KeyError, CryptContext().verify_and_update, 'secret', 'hash') + + #========================================================= + # rounds options + #========================================================= + # NOTE: the follow tests check how _CryptRecord handles + # the min/max/default/vary_rounds options, via the output of + # genconfig(). it's assumed encrypt() takes the same codepath. + + def test_50_rounds_limits(self): + "test rounds limits" + cc = CryptContext(schemes=["sha256_crypt"], all__min_rounds=2000, all__max_rounds=3000, all__default_rounds=2500, @@ -631,7 +1031,7 @@ class CryptContextTest(TestCase): with catch_warnings(record=True) as wlog: # set below handler min - c2 = cc.replace(all__min_rounds=500, all__max_rounds=None, + c2 = cc.copy(all__min_rounds=500, all__max_rounds=None, all__default_rounds=500) self.consumeWarningList(wlog, [PasslibConfigWarning]*2) self.assertEqual(c2.genconfig(salt="nacl"), "$5$rounds=1000$nacl$") @@ -661,7 +1061,7 @@ class CryptContextTest(TestCase): # max rounds with catch_warnings(record=True) as wlog: # set above handler max - c2 = cc.replace(all__max_rounds=int(1e9)+500, all__min_rounds=None, + c2 = cc.copy(all__max_rounds=int(1e9)+500, all__min_rounds=None, all__default_rounds=int(1e9)+500) self.consumeWarningList(wlog, [PasslibConfigWarning]*2) self.assertEqual(c2.genconfig(salt="nacl"), @@ -693,225 +1093,119 @@ class CryptContextTest(TestCase): self.assertEqual(cc.genconfig(salt="nacl"), '$5$rounds=2500$nacl$') # fallback default rounds - use handler's - c2 = cc.replace(all__default_rounds=None, all__max_rounds=50000) - self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=40000$nacl$') + df = hash.sha256_crypt.default_rounds + c2 = cc.copy(all__default_rounds=None, all__max_rounds=df<<1) + self.assertEqual(c2.genconfig(salt="nacl"), + '$5$rounds=%d$nacl$' % df) # fallback default rounds - use handler's, but clipped to max rounds - c2 = cc.replace(all__default_rounds=None, all__max_rounds=3000) + c2 = cc.copy(all__default_rounds=None, all__max_rounds=3000) self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=3000$nacl$') # TODO: test default falls back to mx / mn if handler has no default. #default rounds - out of bounds - self.assertRaises(ValueError, cc.replace, all__default_rounds=1999) - cc.policy.replace(all__default_rounds=2000) - cc.policy.replace(all__default_rounds=3000) - self.assertRaises(ValueError, cc.replace, all__default_rounds=3001) + self.assertRaises(ValueError, cc.copy, all__default_rounds=1999) + cc.copy(all__default_rounds=2000) + cc.copy(all__default_rounds=3000) + self.assertRaises(ValueError, cc.copy, all__default_rounds=3001) # invalid min/max bounds - c2 = CryptContext(policy=None, schemes=["sha256_crypt"]) - self.assertRaises(ValueError, c2.replace, all__min_rounds=-1) - self.assertRaises(ValueError, c2.replace, all__max_rounds=-1) - self.assertRaises(ValueError, c2.replace, all__min_rounds=2000, + c2 = CryptContext(schemes=["sha256_crypt"]) + self.assertRaises(ValueError, c2.copy, all__min_rounds=-1) + self.assertRaises(ValueError, c2.copy, all__max_rounds=-1) + self.assertRaises(ValueError, c2.copy, all__min_rounds=2000, all__max_rounds=1999) - def test_10_03_genconfig_linear_vary_rounds(self): - "test genconfig() linear vary rounds" - cc = CryptContext(policy=None, - schemes=["sha256_crypt"], + def test_51_linear_vary_rounds(self): + "test linear vary rounds" + cc = CryptContext(schemes=["sha256_crypt"], all__min_rounds=1995, all__max_rounds=2005, all__default_rounds=2000, ) # test negative - self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) - self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") - self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%") # test static - c2 = cc.replace(all__vary_rounds=0) + c2 = cc.copy(all__vary_rounds=0) self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) - c2 = cc.replace(all__vary_rounds="0%") + c2 = cc.copy(all__vary_rounds="0%") self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000) # test absolute - c2 = cc.replace(all__vary_rounds=1) + c2 = cc.copy(all__vary_rounds=1) self.assert_rounds_range(c2, "sha256_crypt", 1999, 2001) - c2 = cc.replace(all__vary_rounds=100) + c2 = cc.copy(all__vary_rounds=100) self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) # test relative - c2 = cc.replace(all__vary_rounds="0.1%") + c2 = cc.copy(all__vary_rounds="0.1%") self.assert_rounds_range(c2, "sha256_crypt", 1998, 2002) - c2 = cc.replace(all__vary_rounds="100%") + c2 = cc.copy(all__vary_rounds="100%") self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005) - def test_10_03_genconfig_log2_vary_rounds(self): - "test genconfig() log2 vary rounds" - cc = CryptContext(policy=None, - schemes=["bcrypt"], + def test_52_log2_vary_rounds(self): + "test log2 vary rounds" + cc = CryptContext(schemes=["bcrypt"], all__min_rounds=15, all__max_rounds=25, all__default_rounds=20, ) # test negative - self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1) - self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%") - self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1) + self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%") + self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%") # test static - c2 = cc.replace(all__vary_rounds=0) + c2 = cc.copy(all__vary_rounds=0) self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="0%") + c2 = cc.copy(all__vary_rounds="0%") self.assert_rounds_range(c2, "bcrypt", 20, 20) # test absolute - c2 = cc.replace(all__vary_rounds=1) + c2 = cc.copy(all__vary_rounds=1) self.assert_rounds_range(c2, "bcrypt", 19, 21) - c2 = cc.replace(all__vary_rounds=100) + c2 = cc.copy(all__vary_rounds=100) self.assert_rounds_range(c2, "bcrypt", 15, 25) # test relative - should shift over at 50% mark - c2 = cc.replace(all__vary_rounds="1%") + c2 = cc.copy(all__vary_rounds="1%") self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="49%") + c2 = cc.copy(all__vary_rounds="49%") self.assert_rounds_range(c2, "bcrypt", 20, 20) - c2 = cc.replace(all__vary_rounds="50%") + c2 = cc.copy(all__vary_rounds="50%") self.assert_rounds_range(c2, "bcrypt", 19, 20) - c2 = cc.replace(all__vary_rounds="100%") + c2 = cc.copy(all__vary_rounds="100%") self.assert_rounds_range(c2, "bcrypt", 15, 21) def assert_rounds_range(self, context, scheme, lower, upper): "helper to check vary_rounds covers specified range" # NOTE: this runs enough times the min and max *should* be hit, # though there's a faint chance it will randomly fail. - handler = context.policy.get_handler(scheme) + handler = context.handler(scheme) salt = handler.default_salt_chars[0:1] * handler.max_salt_size seen = set() for i in irange(300): h = context.genconfig(scheme, salt=salt) r = handler.from_string(h).rounds seen.add(r) - self.assertEqual(min(seen), lower, "vary_rounds lower bound:") - self.assertEqual(max(seen), upper, "vary_rounds upper bound:") - - def test_11_encrypt_settings(self): - "test encrypt() honors policy settings" - cc = CryptContext(**self.sample_policy_1) - - # hash specific settings - self.assertEqual( - cc.encrypt("password", scheme="phpass", salt='.'*8), - '$H$5........De04R5Egz0aq8Tf.1eVhY/', - ) - self.assertEqual( - cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"), - '$P$5........De04R5Egz0aq8Tf.1eVhY/', - ) - - # NOTE: more thorough job of rounds limits done in genconfig() test, - # which is much cheaper, and shares the same codebase. - - # min rounds - with catch_warnings(record=True) as wlog: - self.assertEqual( - cc.encrypt("password", rounds=1999, salt="nacl"), - '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97', - ) - self.consumeWarningList(wlog, PasslibConfigWarning) - - self.assertEqual( - cc.encrypt("password", rounds=2001, salt="nacl"), - '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31' - ) - self.consumeWarningList(wlog) - - # max rounds, etc tested in genconfig() - - # make default > max throws error if attempted - self.assertRaises(ValueError, cc.replace, - sha256_crypt__default_rounds=4000) - - def test_12_hash_needs_update(self): - "test hash_needs_update() method" - cc = CryptContext(**self.sample_policy_1) - - #check deprecated scheme - self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA')) - self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0')) - - #check min rounds - self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/')) - self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8')) - - #check max rounds - self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.')) - self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA')) + self.assertEqual(min(seen), lower, "vary_rounds had wrong lower limit:") + self.assertEqual(max(seen), upper, "vary_rounds had wrong upper limit:") #========================================================= - #identify + # feature tests #========================================================= - def test_20_basic(self): - "test basic encrypt/identify/verify functionality" - handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt] - cc = CryptContext(handlers, policy=None) - - #run through handlers - for crypt in handlers: - h = cc.encrypt("test", scheme=crypt.name) - self.assertEqual(cc.identify(h), crypt.name) - self.assertEqual(cc.identify(h, resolve=True), crypt) - self.assertTrue(cc.verify('test', h)) - self.assertTrue(not cc.verify('notest', h)) - - #test default - h = cc.encrypt("test") - self.assertEqual(cc.identify(h), "md5_crypt") - - #test genhash - h = cc.genhash('secret', cc.genconfig()) - self.assertEqual(cc.identify(h), 'md5_crypt') - - h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt') - self.assertEqual(cc.identify(h), 'md5_crypt') - - self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt") - - def test_21_identify(self): - "test identify() border cases" - handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] - cc = CryptContext(handlers, policy=None) - - #check unknown hash - self.assertEqual(cc.identify('$9$232323123$1287319827'), None) - self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True) - - def test_22_verify(self): - "test verify() scheme kwd" - handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"] - cc = CryptContext(handlers, policy=None) - - h = hash.md5_crypt.encrypt("test") - - #check base verify - self.assertTrue(cc.verify("test", h)) - self.assertTrue(not cc.verify("notest", h)) - - #check verify using right alg - self.assertTrue(cc.verify('test', h, scheme='md5_crypt')) - self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt')) - - #check verify using wrong alg - self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt') - - def test_24_min_verify_time(self): + def test_60_min_verify_time(self): "test verify() honors min_verify_time" #NOTE: this whole test assumes time.sleep() and tick() # have better than 100ms accuracy - set via delta. @@ -967,111 +1261,10 @@ class CryptContextTest(TestCase): self.assertAlmostEqual(elapsed, max_delay, delta=delta) self.consumeWarningList(wlog, ".*verify exceeded min_verify_time") - def test_25_verify_and_update(self): - "test verify_and_update()" - cc = CryptContext(**self.sample_policy_1) - - #create some hashes - h1 = cc.encrypt("password", scheme="des_crypt") - h2 = cc.encrypt("password", scheme="sha256_crypt") - - #check bad password, deprecated hash - ok, new_hash = cc.verify_and_update("wrongpass", h1) - self.assertFalse(ok) - self.assertIs(new_hash, None) - - #check bad password, good hash - ok, new_hash = cc.verify_and_update("wrongpass", h2) - self.assertFalse(ok) - self.assertIs(new_hash, None) - - #check right password, deprecated hash - ok, new_hash = cc.verify_and_update("password", h1) - self.assertTrue(ok) - self.assertTrue(cc.identify(new_hash), "sha256_crypt") - - #check right password, good hash - ok, new_hash = cc.verify_and_update("password", h2) - self.assertTrue(ok) - self.assertIs(new_hash, None) - - #========================================================= - # border cases - #========================================================= - def test_30_nonstring_hash(self): - "test non-string hash values cause error" - # - # test hash=None or some other non-string causes TypeError - # and that explicit-scheme code path behaves the same. - # - cc = CryptContext(["des_crypt"]) - for hash, kwds in [ - (None, {}), - (None, {"scheme": "des_crypt"}), - (1, {}), - ((), {}), - ]: - - self.assertRaises(TypeError, cc.identify, hash, **kwds) - self.assertRaises(TypeError, cc.genhash, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.verify, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.verify_and_update, 'stub', hash, **kwds) - self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds) - - # - # but genhash *should* accept None if default scheme lacks config string. - # - cc2 = CryptContext(["mysql323"]) - self.assertRaises(TypeError, cc2.identify, None) - self.assertIsInstance(cc2.genhash("stub", None), str) - self.assertRaises(TypeError, cc2.verify, 'stub', None) - self.assertRaises(TypeError, cc2.verify_and_update, 'stub', None) - self.assertRaises(TypeError, cc2.hash_needs_update, None) - - - def test_31_nonstring_secret(self): - "test non-string password values cause error" - cc = CryptContext(["des_crypt"]) - hash = cc.encrypt("stub") - # - # test secret=None, or some other non-string causes TypeError - # - for secret, kwds in [ - (None, {}), - (None, {"scheme": "des_crypt"}), - (1, {}), - ((), {}), - ]: - self.assertRaises(TypeError, cc.encrypt, secret, **kwds) - self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds) - self.assertRaises(TypeError, cc.verify, secret, hash, **kwds) - self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds) - - #========================================================= - # other - #========================================================= - def test_90_bcrypt_normhash(self): - "teset verify_and_update / hash_needs_update corrects bcrypt padding" - # see issue 25. - bcrypt = hash.bcrypt - - PASS1 = "loppux" - BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" - GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" - ctx = CryptContext(["bcrypt"]) - - with catch_warnings(record=True) as wlog: - self.assertTrue(ctx.hash_needs_update(BAD1)) - self.assertFalse(ctx.hash_needs_update(GOOD1)) - - if bcrypt.has_backend(): - self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) - self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) - res = ctx.verify_and_update(PASS1, BAD1) - self.assertTrue(res[0] and res[1] and res[1] != BAD1) - - def test_91_passprep(self): + def test_61_passprep(self): "test passprep option" + self.require_stringprep() + # saslprep should normalize pu -> pn pu = u("a\u0300") # unnormalized unicode pn = u("\u00E0") # normalized unicode @@ -1126,8 +1319,45 @@ class CryptContextTest(TestCase): self.assertFalse(ctx.verify(pu, ctx.encrypt(pn, scheme="md5_crypt"))) self.assertTrue(ctx.verify(pu, ctx.encrypt(pn, scheme="sha256_crypt"))) + def test_62_bcrypt_update(self): + "test verify_and_update / hash_needs_update corrects bcrypt padding" + # see issue 25. + bcrypt = hash.bcrypt + + PASS1 = "loppux" + BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C" + ctx = CryptContext(["bcrypt"]) + + with catch_warnings(record=True) as wlog: + self.assertTrue(ctx.hash_needs_update(BAD1)) + self.assertFalse(ctx.hash_needs_update(GOOD1)) + + if bcrypt.has_backend(): + self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None)) + self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None)) + ok, new_hash = ctx.verify_and_update(PASS1, BAD1) + self.assertTrue(ok) + self.assertTrue(new_hash and new_hash != BAD1) + + def test_63_bsdi_crypt_update(self): + "test verify_and_update / hash_needs_update correct bsdi even rounds" + even_hash = '_Y/../cG0zkJa6LY6k4c' + odd_hash = '_Z/..TgFg0/ptQtpAgws' + secret = 'test' + ctx = CryptContext(['bsdi_crypt']) + + self.assertTrue(ctx.hash_needs_update(even_hash)) + self.assertFalse(ctx.hash_needs_update(odd_hash)) + + self.assertEqual(ctx.verify_and_update(secret, odd_hash), (True,None)) + self.assertEqual(ctx.verify_and_update("x", even_hash), (False,None)) + ok, new_hash = ctx.verify_and_update(secret, even_hash) + self.assertTrue(ok) + self.assertTrue(new_hash and new_hash != even_hash) + #========================================================= - #eoc + # eoc #========================================================= #========================================================= @@ -1153,44 +1383,25 @@ class LazyCryptContextTest(TestCase): self.assertFalse(has_crypt_handler("dummy_2", True)) - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt")) + self.assertTrue(cc._is_deprecated_scheme("des_crypt")) self.assertTrue(has_crypt_handler("dummy_2", True)) def test_callable_constructor(self): - "test create_policy() hook, returning CryptPolicy" - self.assertFalse(has_crypt_handler("dummy_2")) - register_crypt_handler_path("dummy_2", "passlib.tests.test_context") - - def create_policy(flag=False): - self.assertTrue(flag) - return CryptPolicy(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) - - cc = LazyCryptContext(create_policy=create_policy, flag=True) - - self.assertFalse(has_crypt_handler("dummy_2", True)) - - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) - - self.assertTrue(has_crypt_handler("dummy_2", True)) - - def test_callable_constructor2(self): - "test create_policy() hook, returning dict" self.assertFalse(has_crypt_handler("dummy_2")) register_crypt_handler_path("dummy_2", "passlib.tests.test_context") - def create_policy(flag=False): + def onload(flag=False): self.assertTrue(flag) return dict(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"]) - cc = LazyCryptContext(create_policy=create_policy, flag=True) + cc = LazyCryptContext(onload=onload, flag=True) self.assertFalse(has_crypt_handler("dummy_2", True)) - self.assertTrue(cc.policy.handler_is_deprecated("des_crypt")) - self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"]) + self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt")) + self.assertTrue(cc._is_deprecated_scheme("des_crypt")) self.assertTrue(has_crypt_handler("dummy_2", True)) |