summaryrefslogtreecommitdiff
path: root/passlib/tests/test_context.py
diff options
context:
space:
mode:
authorEli Collins <elic@assurancetechnologies.com>2012-04-17 23:14:51 -0400
committerEli Collins <elic@assurancetechnologies.com>2012-04-17 23:14:51 -0400
commit64ab6fc89b497efa9169f11d55251e417c4db0ba (patch)
treeb3f6f5dc27b87a6bc90cb3686fa98239ee8ff053 /passlib/tests/test_context.py
parent8eb4c4d3b58eec6802c698ddbf357b2fd243a68c (diff)
parentcd029846fdc0c3d7ffc7f53caad4579e7e0e8725 (diff)
downloadpasslib-ironpython-support-dev.tar.gz
Merge from defaultironpython-support-dev
Diffstat (limited to 'passlib/tests/test_context.py')
-rw-r--r--passlib/tests/test_context.py1737
1 files changed, 974 insertions, 763 deletions
diff --git a/passlib/tests/test_context.py b/passlib/tests/test_context.py
index d1e4511..d039ac6 100644
--- a/passlib/tests/test_context.py
+++ b/passlib/tests/test_context.py
@@ -1,9 +1,14 @@
-"""tests for passlib.pwhash -- (c) Assurance Technologies 2003-2009"""
+"""tests for passlib.context"""
#=========================================================
#imports
#=========================================================
from __future__ import with_statement
+from passlib.utils.compat import PY3
#core
+if PY3:
+ from configparser import NoSectionError
+else:
+ from ConfigParser import NoSectionError
import hashlib
from logging import getLogger
import os
@@ -17,7 +22,7 @@ except ImportError:
resource_filename = None
#pkg
from passlib import hash
-from passlib.context import CryptContext, CryptPolicy, LazyCryptContext
+from passlib.context import CryptContext, LazyCryptContext
from passlib.exc import PasslibConfigWarning
from passlib.utils import tick, to_bytes, to_unicode
from passlib.utils.compat import irange, u
@@ -25,90 +30,105 @@ import passlib.utils.handlers as uh
from passlib.tests.utils import TestCase, mktemp, catch_warnings, \
gae_env, set_file
from passlib.registry import register_crypt_handler_path, has_crypt_handler, \
- _unload_handler_name as unload_handler_name
+ _unload_handler_name as unload_handler_name, get_crypt_handler
#module
log = getLogger(__name__)
+#=========================================================
+# support
+#=========================================================
+here = os.path.abspath(os.path.dirname(__file__))
+
+def merge_dicts(first, *args, **kwds):
+ target = first.copy()
+ for arg in args:
+ target.update(arg)
+ if kwds:
+ target.update(kwds)
+ return target
#=========================================================
#
#=========================================================
-class CryptPolicyTest(TestCase):
- "test CryptPolicy object"
-
- #TODO: need to test user categories w/in all this
+class CryptContextTest(TestCase):
+ descriptionPrefix = "CryptContext"
- descriptionPrefix = "CryptPolicy"
+ # TODO: these unittests could really use a good cleanup
+ # and reorganizing, to ensure they're getting everything.
#=========================================================
- #sample crypt policies used for testing
+ # sample configurations used in tests
#=========================================================
#-----------------------------------------------------
- #sample 1 - average config file
+ # sample 1 - typical configuration
#-----------------------------------------------------
- #NOTE: copy of this is stored in file passlib/tests/sample_config_1s.cfg
- sample_config_1s = """\
-[passlib]
-schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt
-default = md5_crypt
-all.vary_rounds = 10%%
-bsdi_crypt.max_rounds = 30000
-bsdi_crypt.default_rounds = 25000
-sha512_crypt.max_rounds = 50000
-sha512_crypt.min_rounds = 40000
-"""
- sample_config_1s_path = os.path.abspath(os.path.join(
- os.path.dirname(__file__), "sample_config_1s.cfg"))
- if not os.path.exists(sample_config_1s_path) and resource_filename:
- #in case we're zipped up in an egg.
- sample_config_1s_path = resource_filename("passlib.tests",
- "sample_config_1s.cfg")
-
- #make sure sample_config_1s uses \n linesep - tests rely on this
- assert sample_config_1s.startswith("[passlib]\nschemes")
-
- sample_config_1pd = dict(
- schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"],
+ sample_1_schemes = ["des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"]
+ sample_1_handlers = [get_crypt_handler(name) for name in sample_1_schemes]
+
+ sample_1_dict = dict(
+ schemes = sample_1_schemes,
default = "md5_crypt",
- all__vary_rounds = "10%",
+ all__vary_rounds = 0.1,
bsdi_crypt__max_rounds = 30000,
bsdi_crypt__default_rounds = 25000,
sha512_crypt__max_rounds = 50000,
sha512_crypt__min_rounds = 40000,
)
- sample_config_1pid = {
- "schemes": "des_crypt, md5_crypt, bsdi_crypt, sha512_crypt",
- "default": "md5_crypt",
- "all.vary_rounds": "10%",
- "bsdi_crypt.max_rounds": 30000,
- "bsdi_crypt.default_rounds": 25000,
- "sha512_crypt.max_rounds": 50000,
- "sha512_crypt.min_rounds": 40000,
- }
-
- sample_config_1prd = dict(
- schemes = [ hash.des_crypt, hash.md5_crypt, hash.bsdi_crypt, hash.sha512_crypt],
- default = "md5_crypt", # NOTE: passlib <= 1.5 was handler obj.
- all__vary_rounds = "10%",
- bsdi_crypt__max_rounds = 30000,
- bsdi_crypt__default_rounds = 25000,
- sha512_crypt__max_rounds = 50000,
- sha512_crypt__min_rounds = 40000,
- )
+ sample_1_resolved_dict = merge_dicts(sample_1_dict,
+ schemes = sample_1_handlers)
+
+ sample_1_unnormalized = u("""\
+[passlib]
+schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt
+default = md5_crypt
+; this is using %...
+all__vary_rounds = 10%%
+; this is using 'rounds' instead of 'default_rounds'
+bsdi_crypt__rounds = 25000
+bsdi_crypt__max_rounds = 30000
+sha512_crypt__max_rounds = 50000
+sha512_crypt__min_rounds = 40000
+""")
+
+ sample_1_unicode = u("""\
+[passlib]
+schemes = des_crypt, md5_crypt, bsdi_crypt, sha512_crypt
+default = md5_crypt
+all__vary_rounds = 0.1
+bsdi_crypt__default_rounds = 25000
+bsdi_crypt__max_rounds = 30000
+sha512_crypt__max_rounds = 50000
+sha512_crypt__min_rounds = 40000
+
+""")
#-----------------------------------------------------
- #sample 2 - partial policy & result of overlay on sample 1
+ # sample 1 external files
#-----------------------------------------------------
- sample_config_2s = """\
-[passlib]
-bsdi_crypt.min_rounds = 29000
-bsdi_crypt.max_rounds = 35000
-bsdi_crypt.default_rounds = 31000
-sha512_crypt.min_rounds = 45000
-"""
- sample_config_2pd = dict(
+ # sample 1 string with '\n' linesep
+ sample_1_path = os.path.join(here, "sample1.cfg")
+
+ # sample 1 with '\r\n' linesep
+ sample_1b_unicode = sample_1_unicode.replace(u("\n"), u("\r\n"))
+ sample_1b_path = os.path.join(here, "sample1b.cfg")
+
+ # sample 1 using UTF-16 and alt section
+ sample_1c_bytes = sample_1_unicode.replace(u("[passlib]"),
+ u("[mypolicy]")).encode("utf-16")
+ sample_1c_path = os.path.join(here, "sample1c.cfg")
+
+ # enable to regenerate sample files
+ if False:
+ set_file(sample_1_path, sample_1_unicode)
+ set_file(sample_1b_path, sample_1b_unicode)
+ set_file(sample_1c_path, sample_1c_bytes)
+
+ #-----------------------------------------------------
+ # sample 2 & 12 - options patch
+ #-----------------------------------------------------
+ sample_2_dict = dict(
#using this to test full replacement of existing options
bsdi_crypt__min_rounds = 29000,
bsdi_crypt__max_rounds = 35000,
@@ -117,511 +137,891 @@ sha512_crypt.min_rounds = 45000
sha512_crypt__min_rounds=45000,
)
- sample_config_12pd = dict(
- schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"],
- default = "md5_crypt",
- all__vary_rounds = "10%",
- bsdi_crypt__min_rounds = 29000,
- bsdi_crypt__max_rounds = 35000,
- bsdi_crypt__default_rounds = 31000,
- sha512_crypt__max_rounds = 50000,
- sha512_crypt__min_rounds=45000,
- )
+ sample_2_unicode = """\
+[passlib]
+bsdi_crypt__min_rounds = 29000
+bsdi_crypt__max_rounds = 35000
+bsdi_crypt__default_rounds = 31000
+sha512_crypt__min_rounds = 45000
+"""
+
+ # sample 2 overlayed on top of sample 1
+ sample_12_dict = merge_dicts(sample_1_dict, sample_2_dict)
#-----------------------------------------------------
- #sample 3 - just changing default
+ # sample 3 & 123 - just changing default from sample 1
#-----------------------------------------------------
- sample_config_3pd = dict(
+ sample_3_dict = dict(
default="sha512_crypt",
)
- sample_config_123pd = dict(
- schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"],
- default = "sha512_crypt",
- all__vary_rounds = "10%",
- bsdi_crypt__min_rounds = 29000,
- bsdi_crypt__max_rounds = 35000,
- bsdi_crypt__default_rounds = 31000,
- sha512_crypt__max_rounds = 50000,
- sha512_crypt__min_rounds=45000,
- )
+ # sample 3 overlayed on 2 overlayed on 1
+ sample_123_dict = merge_dicts(sample_12_dict, sample_3_dict)
#-----------------------------------------------------
- #sample 4 - category specific
+ # sample 4 - used by api tests
#-----------------------------------------------------
- sample_config_4s = """
-[passlib]
-schemes = sha512_crypt
-all.vary_rounds = 10%%
-default.sha512_crypt.max_rounds = 20000
-admin.all.vary_rounds = 5%%
-admin.sha512_crypt.max_rounds = 40000
-"""
+ sample_4_dict = dict(
+ schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt",
+ "sha256_crypt"],
+ deprecated = [ "des_crypt", ],
+ default = "sha256_crypt",
+ bsdi_crypt__max_rounds = 30,
+ bsdi_crypt__default_rounds = 25,
+ bsdi_crypt__vary_rounds = 0,
+ sha256_crypt__max_rounds = 3000,
+ sha256_crypt__min_rounds = 2000,
+ sha256_crypt__default_rounds = 3000,
+ phpass__ident = "H",
+ phpass__default_rounds = 7,
+ )
- sample_config_4pd = dict(
- schemes = [ "sha512_crypt" ],
- all__vary_rounds = "10%",
- sha512_crypt__max_rounds = 20000,
- admin__all__vary_rounds = "5%",
- admin__sha512_crypt__max_rounds = 40000,
- )
+ #=========================================================
+ # constructors
+ #=========================================================
+ def test_01_constructor(self):
+ "test class constructor"
- #-----------------------------------------------------
- #sample 5 - to_string & deprecation testing
- #-----------------------------------------------------
- sample_config_5s = sample_config_1s + """\
-deprecated = des_crypt
-admin__context__deprecated = des_crypt, bsdi_crypt
-"""
+ # test blank constructor works correctly
+ ctx = CryptContext()
+ self.assertEqual(ctx.to_dict(), {})
- sample_config_5pd = sample_config_1pd.copy()
- sample_config_5pd.update(
- deprecated = [ "des_crypt" ],
- admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ],
- )
+ # test sample 1 with scheme=names
+ ctx = CryptContext(**self.sample_1_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 with scheme=handlers
+ ctx = CryptContext(**self.sample_1_resolved_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 2: options w/o schemes
+ ctx = CryptContext(**self.sample_2_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_2_dict)
- sample_config_5pid = sample_config_1pid.copy()
- sample_config_5pid.update({
- "deprecated": "des_crypt",
- "admin.context.deprecated": "des_crypt, bsdi_crypt",
- })
+ # test sample 3: default only
+ ctx = CryptContext(**self.sample_3_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_3_dict)
- sample_config_5prd = sample_config_1prd.copy()
- sample_config_5prd.update({
- # XXX: should deprecated return the actual handlers in this case?
- # would have to modify how policy stores info, for one.
- "deprecated": ["des_crypt"],
- "admin__context__deprecated": ["des_crypt", "bsdi_crypt"],
- })
+ def test_02_from_string(self):
+ "test from_string() constructor"
+ # test sample 1 unicode
+ ctx = CryptContext.from_string(self.sample_1_unicode)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 with unnormalized inputs
+ ctx = CryptContext.from_string(self.sample_1_unnormalized)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 utf-8
+ ctx = CryptContext.from_string(self.sample_1_unicode.encode("utf-8"))
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 w/ '\r\n' linesep
+ ctx = CryptContext.from_string(self.sample_1b_unicode)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 using UTF-16 and alt section
+ ctx = CryptContext.from_string(self.sample_1c_bytes, section="mypolicy",
+ encoding="utf-16")
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test wrong type
+ self.assertRaises(TypeError, CryptContext.from_string, None)
+
+ # test missing section
+ self.assertRaises(NoSectionError, CryptContext.from_string,
+ self.sample_1_unicode, section="fakesection")
+
+ def test_03_from_path(self):
+ "test from_path() constructor"
+ # make sure sample files exist
+ if not os.path.exists(self.sample_1_path):
+ raise RuntimeError("can't find data file: %r" % self.sample_1_path)
+
+ # test sample 1
+ ctx = CryptContext.from_path(self.sample_1_path)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 w/ '\r\n' linesep
+ ctx = CryptContext.from_path(self.sample_1b_path)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test sample 1 encoding using UTF-16 and alt section
+ ctx = CryptContext.from_path(self.sample_1c_path, section="mypolicy",
+ encoding="utf-16")
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # test missing file
+ self.assertRaises(EnvironmentError, CryptContext.from_path,
+ os.path.join(here, "sample1xxx.cfg"))
+
+ # test missing section
+ self.assertRaises(NoSectionError, CryptContext.from_path,
+ self.sample_1_path, section="fakesection")
+
+ def test_04_copy(self):
+ "test copy() method"
+ cc1 = CryptContext(**self.sample_1_dict)
+
+ # overlay sample 2 onto copy
+ cc2 = cc1.copy(**self.sample_2_dict)
+ self.assertEqual(cc1.to_dict(), self.sample_1_dict)
+ self.assertEqual(cc2.to_dict(), self.sample_12_dict)
+
+ # check that repeating overlay makes no change
+ cc2b = cc2.copy(**self.sample_2_dict)
+ self.assertEqual(cc1.to_dict(), self.sample_1_dict)
+ self.assertEqual(cc2b.to_dict(), self.sample_12_dict)
+
+ # overlay sample 3 on copy
+ cc3 = cc2.copy(**self.sample_3_dict)
+ self.assertEqual(cc3.to_dict(), self.sample_123_dict)
+
+ # test empty copy creates separate copy
+ cc4 = cc1.copy()
+ self.assertIsNot(cc4, cc1)
+ self.assertEqual(cc1.to_dict(), self.sample_1_dict)
+ self.assertEqual(cc4.to_dict(), self.sample_1_dict)
+
+ # ... and that modifying copy doesn't affect original
+ cc4.update(**self.sample_2_dict)
+ self.assertEqual(cc1.to_dict(), self.sample_1_dict)
+ self.assertEqual(cc4.to_dict(), self.sample_12_dict)
#=========================================================
- #constructors
+ # modifiers
#=========================================================
- def test_00_constructor(self):
- "test CryptPolicy() constructor"
- policy = CryptPolicy(**self.sample_config_1pd)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #check key with too many separators is rejected
- self.assertRaises(TypeError, CryptPolicy,
- schemes = [ "des_crypt", "md5_crypt", "bsdi_crypt", "sha512_crypt"],
- bad__key__bsdi_crypt__max_rounds = 30000,
+ def test_10_load(self):
+ "test load() / load_path() method"
+ # NOTE: load() is the workhorse that handles all policy parsing,
+ # compilation, and validation. most of it's features are tested
+ # elsewhere, since all the constructors and modifiers are just
+ # wrappers for it.
+
+ # source_type 'auto'
+ ctx = CryptContext()
+
+ # detect dict
+ ctx.load(self.sample_1_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # detect unicode string
+ ctx.load(self.sample_1_unicode)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # detect bytes string
+ ctx.load(self.sample_1_unicode.encode("utf-8"))
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+
+ # anything else - TypeError
+ self.assertRaises(TypeError, ctx.load, None)
+
+ # NOTE: load_path() tested by from_path()
+ # NOTE: additional string tests done by from_string()
+
+ # update flag - tested by update() method tests
+ # encoding keyword - tested by from_string() & from_path()
+ # section keyword - tested by from_string() & from_path()
+
+ # multiple loads should clear the state
+ ctx = CryptContext()
+ ctx.load(self.sample_1_dict)
+ ctx.load(self.sample_2_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_2_dict)
+
+ def test_11_load_rollback(self):
+ "test load() errors restore old state"
+ # create initial context
+ cc = CryptContext(["des_crypt", "sha256_crypt"],
+ sha256_crypt__default_rounds=5000,
+ all__vary_rounds=0.1,
)
+ result = cc.to_string()
- #check nameless handler rejected
- class nameless(uh.StaticHandler):
- name = None
- self.assertRaises(ValueError, CryptPolicy, schemes=[nameless])
+ # do an update operation that should fail during parsing
+ # XXX: not sure what the right error type is here.
+ self.assertRaises(TypeError, cc.update, too__many__key__parts=True)
+ self.assertEqual(cc.to_string(), result)
- # check scheme must be name or crypt handler
- self.assertRaises(TypeError, CryptPolicy, schemes=[uh.StaticHandler])
+ # do an update operation that should fail during extraction
+ # FIXME: this isn't failing even in broken case, need to figure out
+ # way to ensure some keys come after this one.
+ self.assertRaises(KeyError, cc.update, fake_context_option=True)
+ self.assertEqual(cc.to_string(), result)
- #check name conflicts are rejected
- class dummy_1(uh.StaticHandler):
- name = 'dummy_1'
- self.assertRaises(KeyError, CryptPolicy, schemes=[dummy_1, dummy_1])
+ # do an update operation that should fail during compilation
+ self.assertRaises(ValueError, cc.update, sha256_crypt__min_rounds=10000)
+ self.assertEqual(cc.to_string(), result)
- #with unknown deprecated value
- self.assertRaises(KeyError, CryptPolicy,
- schemes=['des_crypt'],
- deprecated=['md5_crypt'])
+ def test_12_update(self):
+ "test update() method"
- #with unknown default value
- self.assertRaises(KeyError, CryptPolicy,
- schemes=['des_crypt'],
- default='md5_crypt')
+ # empty overlay
+ ctx = CryptContext(**self.sample_1_dict)
+ ctx.update()
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
- def test_01_from_path_simple(self):
- "test CryptPolicy.from_path() constructor"
- #NOTE: this is separate so it can also run under GAE
+ # test basic overlay
+ ctx = CryptContext(**self.sample_1_dict)
+ ctx.update(**self.sample_2_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_12_dict)
- #test preset stored in existing file
- path = self.sample_config_1s_path
- policy = CryptPolicy.from_path(path)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
+ # ... and again
+ ctx.update(**self.sample_3_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_123_dict)
- #test if path missing
- self.assertRaises(EnvironmentError, CryptPolicy.from_path, path + 'xxx')
+ # overlay w/ dict arg
+ ctx = CryptContext(**self.sample_1_dict)
+ ctx.update(self.sample_2_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_12_dict)
- def test_01_from_path(self):
- "test CryptPolicy.from_path() constructor with encodings"
- if gae_env:
- return self.skipTest("GAE doesn't offer read/write filesystem access")
+ # overlay w/ string
+ ctx = CryptContext(**self.sample_1_dict)
+ ctx.update(self.sample_2_unicode)
+ self.assertEqual(ctx.to_dict(), self.sample_12_dict)
- path = mktemp()
+ # too many args
+ self.assertRaises(TypeError, ctx.update, {}, {})
+ self.assertRaises(TypeError, ctx.update, {}, schemes=['des_crypt'])
- #test "\n" linesep
- set_file(path, self.sample_config_1s)
- policy = CryptPolicy.from_path(path)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
+ # wrong arg type
+ self.assertRaises(TypeError, ctx.update, None)
- #test "\r\n" linesep
- set_file(path, self.sample_config_1s.replace("\n","\r\n"))
- policy = CryptPolicy.from_path(path)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
+ #=========================================================
+ # option parsing
+ #=========================================================
+ def test_20_options(self):
+ "test basic option parsing"
+ def parse(**kwds):
+ return CryptContext(**kwds).to_dict()
- #test with custom encoding
- uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8")
- set_file(path, uc2)
- policy = CryptPolicy.from_path(path, encoding="utf-16")
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
+ #
+ # common option parsing tests
+ #
- def test_02_from_string(self):
- "test CryptPolicy.from_string() constructor"
- #test "\n" linesep
- policy = CryptPolicy.from_string(self.sample_config_1s)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #test "\r\n" linesep
- policy = CryptPolicy.from_string(
- self.sample_config_1s.replace("\n","\r\n"))
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #test with unicode
- data = to_unicode(self.sample_config_1s)
- policy = CryptPolicy.from_string(data)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #test with non-ascii-compatible encoding
- uc2 = to_bytes(self.sample_config_1s, "utf-16", source_encoding="utf-8")
- policy = CryptPolicy.from_string(uc2, encoding="utf-16")
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #test category specific options
- policy = CryptPolicy.from_string(self.sample_config_4s)
- self.assertEqual(policy.to_dict(), self.sample_config_4pd)
-
- def test_03_from_source(self):
- "test CryptPolicy.from_source() constructor"
- #pass it a path
- policy = CryptPolicy.from_source(self.sample_config_1s_path)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #pass it a string
- policy = CryptPolicy.from_source(self.sample_config_1s)
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #pass it a dict (NOTE: make a copy to detect in-place modifications)
- policy = CryptPolicy.from_source(self.sample_config_1pd.copy())
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #pass it existing policy
- p2 = CryptPolicy.from_source(policy)
- self.assertIs(policy, p2)
-
- #pass it something wrong
- self.assertRaises(TypeError, CryptPolicy.from_source, 1)
- self.assertRaises(TypeError, CryptPolicy.from_source, [])
-
- def test_04_from_sources(self):
- "test CryptPolicy.from_sources() constructor"
-
- #pass it empty list
- self.assertRaises(ValueError, CryptPolicy.from_sources, [])
-
- #pass it one-element list
- policy = CryptPolicy.from_sources([self.sample_config_1s])
- self.assertEqual(policy.to_dict(), self.sample_config_1pd)
-
- #pass multiple sources
- policy = CryptPolicy.from_sources(
- [
- self.sample_config_1s_path,
- self.sample_config_2s,
- self.sample_config_3pd,
- ])
- self.assertEqual(policy.to_dict(), self.sample_config_123pd)
-
- def test_05_replace(self):
- "test CryptPolicy.replace() constructor"
-
- p1 = CryptPolicy(**self.sample_config_1pd)
-
- #check overlaying sample 2
- p2 = p1.replace(**self.sample_config_2pd)
- self.assertEqual(p2.to_dict(), self.sample_config_12pd)
-
- #check repeating overlay makes no change
- p2b = p2.replace(**self.sample_config_2pd)
- self.assertEqual(p2b.to_dict(), self.sample_config_12pd)
-
- #check overlaying sample 3
- p3 = p2.replace(self.sample_config_3pd)
- self.assertEqual(p3.to_dict(), self.sample_config_123pd)
-
- def test_06_forbidden(self):
- "test CryptPolicy() forbidden kwds"
-
- #salt not allowed to be set
- self.assertRaises(KeyError, CryptPolicy,
- schemes=["des_crypt"],
- des_crypt__salt="xx",
- )
- self.assertRaises(KeyError, CryptPolicy,
- schemes=["des_crypt"],
- all__salt="xx",
- )
+ # test key with blank separators is rejected
+ self.assertRaises(TypeError, CryptContext, __=0.1)
+ self.assertRaises(TypeError, CryptContext, __default='x')
+ self.assertRaises(TypeError, CryptContext, default____default='x')
+ self.assertRaises(TypeError, CryptContext, __default____default='x')
- #schemes not allowed for category
- self.assertRaises(KeyError, CryptPolicy,
- schemes=["des_crypt"],
- user__context__schemes=["md5_crypt"],
- )
+ # test key with too many separators is rejected
+ self.assertRaises(TypeError, CryptContext,
+ category__scheme__option__invalid = 30000)
- #=========================================================
- #reading
- #=========================================================
- def test_10_has_schemes(self):
- "test has_schemes() method"
+ #
+ # context option -specific tests
+ #
- p1 = CryptPolicy(**self.sample_config_1pd)
- self.assertTrue(p1.has_schemes())
+ # test context option key parsing
+ result = dict(default="md5_crypt")
+ self.assertEqual(parse(default="md5_crypt"), result)
+ self.assertEqual(parse(context__default="md5_crypt"), result)
+ self.assertEqual(parse(default__context__default="md5_crypt"), result)
+ self.assertEqual(parse(**{"context.default":"md5_crypt"}), result)
+ self.assertEqual(parse(**{"default.context.default":"md5_crypt"}), result)
- p3 = CryptPolicy(**self.sample_config_3pd)
- self.assertTrue(not p3.has_schemes())
+ # test context option key parsing w/ category
+ result = dict(admin__context__default="md5_crypt")
+ self.assertEqual(parse(admin__context__default="md5_crypt"), result)
+ self.assertEqual(parse(**{"admin.context.default":"md5_crypt"}), result)
- def test_11_iter_handlers(self):
- "test iter_handlers() method"
+ #
+ # hash option -specific tests
+ #
- p1 = CryptPolicy(**self.sample_config_1pd)
- s = self.sample_config_1prd['schemes']
- self.assertEqual(list(p1.iter_handlers()), s)
+ # test hash option key parsing
+ result = dict(all__vary_rounds=0.1)
+ self.assertEqual(parse(all__vary_rounds=0.1), result)
+ self.assertEqual(parse(default__all__vary_rounds=0.1), result)
+ self.assertEqual(parse(**{"all.vary_rounds":0.1}), result)
+ self.assertEqual(parse(**{"default.all.vary_rounds":0.1}), result)
- p3 = CryptPolicy(**self.sample_config_3pd)
- self.assertEqual(list(p3.iter_handlers()), [])
+ # test hash option key parsing w/ category
+ result = dict(admin__all__vary_rounds=0.1)
+ self.assertEqual(parse(admin__all__vary_rounds=0.1), result)
+ self.assertEqual(parse(**{"admin.all.vary_rounds":0.1}), result)
- def test_12_get_handler(self):
- "test get_handler() method"
+ # settings not allowed if not in hash.settings_kwds
+ ctx = CryptContext(["phpass", "md5_crypt"], phpass__ident="P")
+ self.assertRaises(KeyError, ctx.copy, md5_crypt__ident="P")
- p1 = CryptPolicy(**self.sample_config_1pd)
+ # hash options 'salt' and 'rounds' not allowed
+ self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"],
+ des_crypt__salt="xx")
+ self.assertRaises(KeyError, CryptContext, schemes=["des_crypt"],
+ all__salt="xx")
- #check by name
- self.assertIs(p1.get_handler("bsdi_crypt"), hash.bsdi_crypt)
+ def test_21_schemes(self):
+ "test 'schemes' context option parsing"
- #check by missing name
- self.assertIs(p1.get_handler("sha256_crypt"), None)
- self.assertRaises(KeyError, p1.get_handler, "sha256_crypt", required=True)
+ # schemes can be empty
+ cc = CryptContext(schemes=None)
+ self.assertEqual(cc.schemes(), ())
- #check default
- self.assertIs(p1.get_handler(), hash.md5_crypt)
+ # schemes can be list of names
+ cc = CryptContext(schemes=["des_crypt", "md5_crypt"])
+ self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt"))
- def test_13_get_options(self):
- "test get_options() method"
+ # schemes can be comma-sep string
+ cc = CryptContext(schemes=" des_crypt, md5_crypt, ")
+ self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt"))
- p12 = CryptPolicy(**self.sample_config_12pd)
+ # schemes can be list of handlers
+ cc = CryptContext(schemes=[hash.des_crypt, hash.md5_crypt])
+ self.assertEqual(cc.schemes(), ("des_crypt", "md5_crypt"))
- self.assertEqual(p12.get_options("bsdi_crypt"),dict(
- vary_rounds = "10%",
- min_rounds = 29000,
- max_rounds = 35000,
- default_rounds = 31000,
- ))
+ # scheme must be name or handler
+ self.assertRaises(TypeError, CryptContext, schemes=[uh.StaticHandler])
- self.assertEqual(p12.get_options("sha512_crypt"),dict(
- vary_rounds = "10%",
- min_rounds = 45000,
- max_rounds = 50000,
- ))
+ # handlers must have a name
+ class nameless(uh.StaticHandler):
+ name = None
+ self.assertRaises(ValueError, CryptContext, schemes=[nameless])
+
+ # names must be unique
+ class dummy_1(uh.StaticHandler):
+ name = 'dummy_1'
+ self.assertRaises(KeyError, CryptContext, schemes=[dummy_1, dummy_1])
+
+ # schemes not allowed per-category
+ self.assertRaises(KeyError, CryptContext,
+ admin__context__schemes=["md5_crypt"])
+
+ def test_22_deprecated(self):
+ "test 'deprecated' context option parsing"
+ def getdep(ctx, category=None):
+ return [name for name in ctx.schemes()
+ if ctx._is_deprecated_scheme(name, category)]
+
+ # no schemes - all deprecated values allowed
+ cc = CryptContext(deprecated=["md5_crypt"])
+ cc.update(schemes=["md5_crypt", "des_crypt"])
+ self.assertEqual(getdep(cc),["md5_crypt"])
+
+ # deprecated values allowed if subset of schemes
+ cc = CryptContext(deprecated=["md5_crypt"], schemes=["md5_crypt", "des_crypt"])
+ self.assertEqual(getdep(cc), ["md5_crypt"])
+
+ # can be handler
+ # XXX: allow handlers in deprecated list? not for now.
+ self.assertRaises(TypeError, CryptContext, deprecated=[hash.md5_crypt],
+ schemes=["md5_crypt", "des_crypt"])
+## cc = CryptContext(deprecated=[hash.md5_crypt], schemes=["md5_crypt", "des_crypt"])
+## self.assertEqual(getdep(cc), ["md5_crypt"])
+
+ # comma sep list
+ cc = CryptContext(deprecated="md5_crypt,des_crypt", schemes=["md5_crypt", "des_crypt"])
+ self.assertEqual(getdep(cc), ["md5_crypt", "des_crypt"])
+
+ # values outside of schemes not allowed
+ self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'],
+ deprecated=['md5_crypt'])
+
+ # wrong type
+ self.assertRaises(TypeError, CryptContext, deprecated=123)
+
+ # deprecated per-category
+ cc = CryptContext(deprecated=["md5_crypt"],
+ schemes=["md5_crypt", "des_crypt"],
+ admin__context__deprecated=["des_crypt"],
+ )
+ self.assertEqual(getdep(cc), ["md5_crypt"])
+ self.assertEqual(getdep(cc, "user"), ["md5_crypt"])
+ self.assertEqual(getdep(cc, "admin"), ["des_crypt"])
+
+ def test_23_default(self):
+ "test 'default' context option parsing"
+
+ # anything allowed if no schemes
+ self.assertEqual(CryptContext(default="md5_crypt").to_dict(),
+ dict(default="md5_crypt"))
+
+ # default allowed if in scheme list
+ ctx = CryptContext(default="md5_crypt", schemes=["des_crypt", "md5_crypt"])
+ self.assertEqual(ctx.default_scheme(), "md5_crypt")
- p4 = CryptPolicy.from_string(self.sample_config_4s)
- self.assertEqual(p4.get_options("sha512_crypt"), dict(
- vary_rounds="10%",
+ # default can be handler
+ # XXX: sure we want to allow this ? maybe deprecate in future.
+ ctx = CryptContext(default=hash.md5_crypt, schemes=["des_crypt", "md5_crypt"])
+ self.assertEqual(ctx.default_scheme(), "md5_crypt")
+
+ # error if not in scheme list
+ self.assertRaises(KeyError, CryptContext, schemes=['des_crypt'],
+ default='md5_crypt')
+
+ # wrong type
+ self.assertRaises(TypeError, CryptContext, default=1)
+
+ # per-category
+ ctx = CryptContext(default="des_crypt",
+ schemes=["des_crypt", "md5_crypt"],
+ admin__context__default="md5_crypt")
+ self.assertEqual(ctx.default_scheme(), "des_crypt")
+ self.assertEqual(ctx.default_scheme("user"), "des_crypt")
+ self.assertEqual(ctx.default_scheme("admin"), "md5_crypt")
+
+ def test_24_vary_rounds(self):
+ "test 'vary_rounds' hash option parsing"
+ def parse(v):
+ return CryptContext(all__vary_rounds=v).to_dict()['all__vary_rounds']
+
+ # floats should be preserved
+ self.assertEqual(parse(0.1), 0.1)
+ self.assertEqual(parse('0.1'), 0.1)
+
+ # 'xx%' should be converted to float
+ self.assertEqual(parse('10%'), 0.1)
+
+ # ints should be preserved
+ self.assertEqual(parse(1000), 1000)
+ self.assertEqual(parse('1000'), 1000)
+
+ #=========================================================
+ # inspection & serialization
+ #=========================================================
+ def test_30_schemes(self):
+ "test schemes() method"
+ # NOTE: also checked under test_21
+
+ # test empty
+ ctx = CryptContext()
+ self.assertEqual(ctx.schemes(), ())
+ self.assertEqual(ctx.schemes(resolve=True), ())
+
+ # test sample 1
+ ctx = CryptContext(**self.sample_1_dict)
+ self.assertEqual(ctx.schemes(), tuple(self.sample_1_schemes))
+ self.assertEqual(ctx.schemes(resolve=True), tuple(self.sample_1_handlers))
+
+ # test sample 2
+ ctx = CryptContext(**self.sample_2_dict)
+ self.assertEqual(ctx.schemes(), ())
+
+ def test_31_default_scheme(self):
+ "test default_scheme() method"
+ # NOTE: also checked under test_23
+
+ # test empty
+ ctx = CryptContext()
+ self.assertRaises(KeyError, ctx.default_scheme)
+
+ # test sample 1
+ ctx = CryptContext(**self.sample_1_dict)
+ self.assertEqual(ctx.default_scheme(), "md5_crypt")
+ self.assertEqual(ctx.default_scheme(resolve=True), hash.md5_crypt)
+
+ # test sample 2
+ ctx = CryptContext(**self.sample_2_dict)
+ self.assertRaises(KeyError, ctx.default_scheme)
+
+ # test defaults to first in scheme
+ ctx = CryptContext(schemes=self.sample_1_schemes)
+ self.assertEqual(ctx.default_scheme(), "des_crypt")
+
+ # categories tested under test_23
+
+ def test_32_handler(self):
+ "test handler() method"
+
+ # default for empty
+ ctx = CryptContext()
+ self.assertRaises(KeyError, ctx.handler)
+
+ # default for sample 1
+ ctx = CryptContext(**self.sample_1_dict)
+ self.assertEqual(ctx.handler(), hash.md5_crypt)
+
+ # by name
+ self.assertEqual(ctx.handler("des_crypt"), hash.des_crypt)
+
+ # name not in schemes
+ self.assertRaises(KeyError, ctx.handler, "mysql323")
+
+ # TODO: per-category
+
+ def test_33_options(self):
+ "test internal _get_record_options() method"
+ def options(ctx, scheme, category=None):
+ return ctx._get_record_options(scheme, category)[0]
+
+ # this checks that (3 schemes, 3 categories) inherit options correctly.
+ # the 'user' category is not present in the options.
+ cc4 = CryptContext(
+ schemes = [ "sha512_crypt", "des_crypt", "bsdi_crypt"],
+ deprecated = ["sha512_crypt", "des_crypt"],
+ all__vary_rounds = 0.1,
+ bsdi_crypt__vary_rounds=0.2,
+ sha512_crypt__max_rounds = 20000,
+ admin__context__deprecated = [ "des_crypt", "bsdi_crypt" ],
+ admin__all__vary_rounds = 0.05,
+ admin__bsdi_crypt__vary_rounds=0.3,
+ admin__sha512_crypt__max_rounds = 40000,
+ )
+ self.assertEqual(cc4._categories, ("admin",))
+
+ #
+ # sha512_crypt
+ #
+ self.assertEqual(options(cc4, "sha512_crypt"), dict(
+ deprecated=True,
+ vary_rounds=0.1, # inherited from all__
max_rounds=20000,
))
- self.assertEqual(p4.get_options("sha512_crypt", "user"), dict(
- vary_rounds="10%",
+ self.assertEqual(options(cc4, "sha512_crypt", "user"), dict(
+ deprecated=True, # unconfigured category inherits from default
+ vary_rounds=0.1,
max_rounds=20000,
))
- self.assertEqual(p4.get_options("sha512_crypt", "admin"), dict(
- vary_rounds="5%",
- max_rounds=40000,
+ self.assertEqual(options(cc4, "sha512_crypt", "admin"), dict(
+ # NOT deprecated - context option overridden per-category
+ vary_rounds=0.05, # global overridden per-cateogry
+ max_rounds=40000, # overridden per-category
))
- def test_14_handler_is_deprecated(self):
- "test handler_is_deprecated() method"
- pa = CryptPolicy(**self.sample_config_1pd)
- pb = CryptPolicy(**self.sample_config_5pd)
-
- self.assertFalse(pa.handler_is_deprecated("des_crypt"))
- self.assertFalse(pa.handler_is_deprecated(hash.bsdi_crypt))
- self.assertFalse(pa.handler_is_deprecated("sha512_crypt"))
-
- self.assertTrue(pb.handler_is_deprecated("des_crypt"))
- self.assertFalse(pb.handler_is_deprecated(hash.bsdi_crypt))
- self.assertFalse(pb.handler_is_deprecated("sha512_crypt"))
-
- #check categories as well
- self.assertTrue(pb.handler_is_deprecated("des_crypt", "user"))
- self.assertFalse(pb.handler_is_deprecated("bsdi_crypt", "user"))
- self.assertTrue(pb.handler_is_deprecated("des_crypt", "admin"))
- self.assertTrue(pb.handler_is_deprecated("bsdi_crypt", "admin"))
-
- # check deprecation is overridden per category
- pc = CryptPolicy(
- schemes=["md5_crypt", "des_crypt"],
- deprecated=["md5_crypt"],
- user__context__deprecated=["des_crypt"],
- )
- self.assertTrue(pc.handler_is_deprecated("md5_crypt"))
- self.assertFalse(pc.handler_is_deprecated("des_crypt"))
- self.assertFalse(pc.handler_is_deprecated("md5_crypt", "user"))
- self.assertTrue(pc.handler_is_deprecated("des_crypt", "user"))
+ #
+ # des_crypt
+ #
+ self.assertEqual(options(cc4, "des_crypt"), dict(
+ deprecated=True,
+ vary_rounds=0.1,
+ ))
- def test_15_min_verify_time(self):
- "test get_min_verify_time() method"
- # silence deprecation warnings for min verify time
- warnings.filterwarnings("ignore", category=DeprecationWarning)
+ self.assertEqual(options(cc4, "des_crypt", "user"), dict(
+ deprecated=True, # unconfigured category inherits from default
+ vary_rounds=0.1,
+ ))
- pa = CryptPolicy()
- self.assertEqual(pa.get_min_verify_time(), 0)
- self.assertEqual(pa.get_min_verify_time('admin'), 0)
+ self.assertEqual(options(cc4, "des_crypt", "admin"), dict(
+ deprecated=True, # unchanged though overidden
+ vary_rounds=0.05, # global overridden per-cateogry
+ ))
- pb = pa.replace(min_verify_time=.1)
- self.assertEqual(pb.get_min_verify_time(), .1)
- self.assertEqual(pb.get_min_verify_time('admin'), .1)
+ #
+ # bsdi_crypt
+ #
+ self.assertEqual(options(cc4, "bsdi_crypt"), dict(
+ vary_rounds=0.2, # overridden from all__vary_rounds
+ ))
- pc = pa.replace(admin__context__min_verify_time=.2)
- self.assertEqual(pc.get_min_verify_time(), 0)
- self.assertEqual(pc.get_min_verify_time('admin'), .2)
+ self.assertEqual(options(cc4, "bsdi_crypt", "user"), dict(
+ vary_rounds=0.2, # unconfigured category inherits from default
+ ))
- pd = pb.replace(admin__context__min_verify_time=.2)
- self.assertEqual(pd.get_min_verify_time(), .1)
- self.assertEqual(pd.get_min_verify_time('admin'), .2)
+ self.assertEqual(options(cc4, "bsdi_crypt", "admin"), dict(
+ vary_rounds=0.3,
+ deprecated=True, # deprecation set per-category
+ ))
- #=========================================================
- #serialization
- #=========================================================
- def test_20_iter_config(self):
- "test iter_config() method"
- p5 = CryptPolicy(**self.sample_config_5pd)
- self.assertEqual(dict(p5.iter_config()), self.sample_config_5pd)
- self.assertEqual(dict(p5.iter_config(resolve=True)), self.sample_config_5prd)
- self.assertEqual(dict(p5.iter_config(ini=True)), self.sample_config_5pid)
-
- def test_21_to_dict(self):
+ def test_34_to_dict(self):
"test to_dict() method"
- p5 = CryptPolicy(**self.sample_config_5pd)
- self.assertEqual(p5.to_dict(), self.sample_config_5pd)
- self.assertEqual(p5.to_dict(resolve=True), self.sample_config_5prd)
+ # NOTE: this is tested all throughout this test case.
+ ctx = CryptContext(**self.sample_1_dict)
+ self.assertEqual(ctx.to_dict(), self.sample_1_dict)
+ self.assertEqual(ctx.to_dict(resolve=True), self.sample_1_resolved_dict)
- def test_22_to_string(self):
+ def test_35_to_string(self):
"test to_string() method"
- pa = CryptPolicy(**self.sample_config_5pd)
- s = pa.to_string() #NOTE: can't compare string directly, ordering etc may not match
- pb = CryptPolicy.from_string(s)
- self.assertEqual(pb.to_dict(), self.sample_config_5pd)
- #=========================================================
- #
- #=========================================================
+ # create ctx and serialize
+ ctx = CryptContext(**self.sample_1_dict)
+ dump = ctx.to_string()
-#=========================================================
-#CryptContext
-#=========================================================
-class CryptContextTest(TestCase):
- "test CryptContext class"
- descriptionPrefix = "CryptContext"
+ # check ctx->string returns canonical format.
+ # NOTE: ConfigParser for PY26 and earlier didn't use OrderedDict,
+ # so to_string() won't get order correct.
+ # so we skip this test.
+ import sys
+ if sys.version_info >= (2,7):
+ self.assertEqual(dump, self.sample_1_unicode)
+
+ # check ctx->string->ctx->dict returns original
+ ctx2 = CryptContext.from_string(dump)
+ self.assertEqual(ctx2.to_dict(), self.sample_1_dict)
+
+ # TODO: test other features, like the unmanaged handler warning.
+ # TODO: test compact mode, section
#=========================================================
- #constructor
+ # password hash api
#=========================================================
- def test_00_constructor(self):
- "test constructor"
- #create crypt context using handlers
- cc = CryptContext([hash.md5_crypt, hash.bsdi_crypt, hash.des_crypt])
- c,b,a = cc.policy.iter_handlers()
- self.assertIs(a, hash.des_crypt)
- self.assertIs(b, hash.bsdi_crypt)
- self.assertIs(c, hash.md5_crypt)
-
- #create context using names
- cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"])
- c,b,a = cc.policy.iter_handlers()
- self.assertIs(a, hash.des_crypt)
- self.assertIs(b, hash.bsdi_crypt)
- self.assertIs(c, hash.md5_crypt)
-
- #TODO: test policy & other options
-
- def test_01_replace(self):
- "test replace()"
-
- cc = CryptContext(["md5_crypt", "bsdi_crypt", "des_crypt"])
- self.assertIs(cc.policy.get_handler(), hash.md5_crypt)
-
- cc2 = cc.replace()
- self.assertIsNot(cc2, cc)
- self.assertIs(cc2.policy, cc.policy)
-
- cc3 = cc.replace(default="bsdi_crypt")
- self.assertIsNot(cc3, cc)
- self.assertIsNot(cc3.policy, cc.policy)
- self.assertIs(cc3.policy.get_handler(), hash.bsdi_crypt)
-
- def test_02_no_handlers(self):
- "test no handlers"
-
- #check constructor...
- cc = CryptContext()
- self.assertRaises(KeyError, cc.identify, 'hash', required=True)
- self.assertRaises(KeyError, cc.encrypt, 'secret')
- self.assertRaises(KeyError, cc.verify, 'secret', 'hash')
+ nonstring_vectors = [
+ (None, {}),
+ (None, {"scheme": "des_crypt"}),
+ (1, {}),
+ ((), {}),
+ ]
+
+ def test_40_basic(self):
+ "test basic encrypt/identify/verify functionality"
+ handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt]
+ cc = CryptContext(handlers)
+
+ #run through handlers
+ for crypt in handlers:
+ h = cc.encrypt("test", scheme=crypt.name)
+ self.assertEqual(cc.identify(h), crypt.name)
+ self.assertEqual(cc.identify(h, resolve=True), crypt)
+ self.assertTrue(cc.verify('test', h))
+ self.assertTrue(not cc.verify('notest', h))
- #check updating policy after the fact...
- cc = CryptContext(['md5_crypt'])
- p = CryptPolicy(schemes=[])
- cc.policy = p
+ #test default
+ h = cc.encrypt("test")
+ self.assertEqual(cc.identify(h), "md5_crypt")
- self.assertRaises(KeyError, cc.identify, 'hash', required=True)
- self.assertRaises(KeyError, cc.encrypt, 'secret')
- self.assertRaises(KeyError, cc.verify, 'secret', 'hash')
+ #test genhash
+ h = cc.genhash('secret', cc.genconfig())
+ self.assertEqual(cc.identify(h), 'md5_crypt')
- #=========================================================
- #policy adaptation
- #=========================================================
- sample_policy_1 = dict(
- schemes = [ "des_crypt", "md5_crypt", "phpass", "bsdi_crypt",
- "sha256_crypt"],
- deprecated = [ "des_crypt", ],
- default = "sha256_crypt",
- bsdi_crypt__max_rounds = 30,
- bsdi_crypt__default_rounds = 25,
- bsdi_crypt__vary_rounds = 0,
- sha256_crypt__max_rounds = 3000,
- sha256_crypt__min_rounds = 2000,
- sha256_crypt__default_rounds = 3000,
- phpass__ident = "H",
- phpass__default_rounds = 7,
- )
+ h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt')
+ self.assertEqual(cc.identify(h), 'md5_crypt')
- def test_10_01_genconfig_settings(self):
- "test genconfig() settings"
- cc = CryptContext(policy=None,
- schemes=["md5_crypt", "phpass"],
+ self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt")
+
+ def test_41_genconfig(self):
+ "test genconfig() method"
+ cc = CryptContext(schemes=["md5_crypt", "phpass"],
phpass__ident="H",
phpass__default_rounds=7,
)
- # hash specific settings
+ # uses default scheme
self.assertTrue(cc.genconfig().startswith("$1$"))
- self.assertEqual(
- cc.genconfig(scheme="phpass", salt='.'*8),
- '$H$5........',
- )
+
+ # override scheme
+ self.assertTrue(cc.genconfig(scheme="phpass").startswith("$H$5"))
+
+ # override scheme & custom settings
self.assertEqual(
cc.genconfig(scheme="phpass", salt='.'*8, rounds=8, ident='P'),
'$P$6........',
)
- # unsupported hash settings should be rejected
- self.assertRaises(KeyError, cc.replace, md5_crypt__ident="P")
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().genconfig)
+
+ def test_42_genhash(self):
+ "test genhash() method"
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string secrets
+ cc = CryptContext(["des_crypt"])
+ hash = cc.encrypt('stub')
+ for secret, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds)
+
+ # rejects non-string hashes
+ cc = CryptContext(["des_crypt"])
+ for hash, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.genhash, 'secret', hash, **kwds)
+
+ # .. but should accept None if default scheme lacks config string
+ cc = CryptContext(["mysql323"])
+ self.assertIsInstance(cc.genhash("stub", None), str)
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().genhash, 'secret', 'hash')
+
+ def test_43_encrypt(self):
+ "test encrypt() method"
+ cc = CryptContext(**self.sample_4_dict)
+
+ # hash specific settings
+ self.assertEqual(
+ cc.encrypt("password", scheme="phpass", salt='.'*8),
+ '$H$5........De04R5Egz0aq8Tf.1eVhY/',
+ )
+ self.assertEqual(
+ cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"),
+ '$P$5........De04R5Egz0aq8Tf.1eVhY/',
+ )
+
+ # NOTE: more thorough job of rounds limits done below.
- def test_10_02_genconfig_rounds_limits(self):
- "test genconfig() policy rounds limits"
- cc = CryptContext(policy=None,
- schemes=["sha256_crypt"],
+ # min rounds
+ with catch_warnings(record=True) as wlog:
+ self.assertEqual(
+ cc.encrypt("password", rounds=1999, salt="nacl"),
+ '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97',
+ )
+ self.consumeWarningList(wlog, PasslibConfigWarning)
+
+ self.assertEqual(
+ cc.encrypt("password", rounds=2001, salt="nacl"),
+ '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31'
+ )
+ self.consumeWarningList(wlog)
+
+ # NOTE: max rounds, etc tested in genconfig()
+
+ # make default > max throws error if attempted
+ self.assertRaises(ValueError, cc.copy,
+ sha256_crypt__default_rounds=4000)
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string secrets
+ cc = CryptContext(["des_crypt"])
+ for secret, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.encrypt, secret, **kwds)
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().encrypt, 'secret')
+
+ def test_44_identify(self):
+ "test identify() border cases"
+ handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"]
+ cc = CryptContext(handlers)
+
+ #check unknown hash
+ self.assertEqual(cc.identify('$9$232323123$1287319827'), None)
+ self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True)
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string hashes
+ cc = CryptContext(["des_crypt"])
+ for hash, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.identify, hash, **kwds)
+
+ # throws error without schemes
+ cc = CryptContext()
+ self.assertIs(cc.identify('hash'), None)
+ self.assertRaises(KeyError, cc.identify, 'hash', required=True)
+
+ def test_45_verify(self):
+ "test verify() scheme kwd"
+ handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"]
+ cc = CryptContext(handlers)
+
+ h = hash.md5_crypt.encrypt("test")
+
+ #check base verify
+ self.assertTrue(cc.verify("test", h))
+ self.assertTrue(not cc.verify("notest", h))
+
+ #check verify using right alg
+ self.assertTrue(cc.verify('test', h, scheme='md5_crypt'))
+ self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt'))
+
+ #check verify using wrong alg
+ self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt')
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string secrets
+ cc = CryptContext(["des_crypt"])
+ h = cc.encrypt('stub')
+ for secret, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.verify, secret, h, **kwds)
+
+ # rejects non-string hashes
+ cc = CryptContext(["des_crypt"])
+ for h, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.verify, 'secret', h, **kwds)
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().verify, 'secret', 'hash')
+
+ def test_46_hash_needs_update(self):
+ "test hash_needs_update() method"
+ cc = CryptContext(**self.sample_4_dict)
+
+ #check deprecated scheme
+ self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA'))
+ self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0'))
+
+ #check min rounds
+ self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/'))
+ self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8'))
+
+ #check max rounds
+ self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.'))
+ self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA'))
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string hashes
+ cc = CryptContext(["des_crypt"])
+ for hash, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds)
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().hash_needs_update, 'hash')
+
+ def test_47_verify_and_update(self):
+ "test verify_and_update()"
+ cc = CryptContext(**self.sample_4_dict)
+
+ #create some hashes
+ h1 = cc.encrypt("password", scheme="des_crypt")
+ h2 = cc.encrypt("password", scheme="sha256_crypt")
+
+ #check bad password, deprecated hash
+ ok, new_hash = cc.verify_and_update("wrongpass", h1)
+ self.assertFalse(ok)
+ self.assertIs(new_hash, None)
+
+ #check bad password, good hash
+ ok, new_hash = cc.verify_and_update("wrongpass", h2)
+ self.assertFalse(ok)
+ self.assertIs(new_hash, None)
+
+ #check right password, deprecated hash
+ ok, new_hash = cc.verify_and_update("password", h1)
+ self.assertTrue(ok)
+ self.assertTrue(cc.identify(new_hash), "sha256_crypt")
+
+ #check right password, good hash
+ ok, new_hash = cc.verify_and_update("password", h2)
+ self.assertTrue(ok)
+ self.assertIs(new_hash, None)
+
+ #--------------------------------------------------------------
+ # border cases
+ #--------------------------------------------------------------
+
+ # rejects non-string secrets
+ cc = CryptContext(["des_crypt"])
+ hash = cc.encrypt('stub')
+ for secret, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds)
+
+ # rejects non-string hashes
+ cc = CryptContext(["des_crypt"])
+ for hash, kwds in self.nonstring_vectors:
+ self.assertRaises(TypeError, cc.verify_and_update, 'secret', hash, **kwds)
+
+ # throws error without schemes
+ self.assertRaises(KeyError, CryptContext().verify_and_update, 'secret', 'hash')
+
+ #=========================================================
+ # rounds options
+ #=========================================================
+ # NOTE: the follow tests check how _CryptRecord handles
+ # the min/max/default/vary_rounds options, via the output of
+ # genconfig(). it's assumed encrypt() takes the same codepath.
+
+ def test_50_rounds_limits(self):
+ "test rounds limits"
+ cc = CryptContext(schemes=["sha256_crypt"],
all__min_rounds=2000,
all__max_rounds=3000,
all__default_rounds=2500,
@@ -631,7 +1031,7 @@ class CryptContextTest(TestCase):
with catch_warnings(record=True) as wlog:
# set below handler min
- c2 = cc.replace(all__min_rounds=500, all__max_rounds=None,
+ c2 = cc.copy(all__min_rounds=500, all__max_rounds=None,
all__default_rounds=500)
self.consumeWarningList(wlog, [PasslibConfigWarning]*2)
self.assertEqual(c2.genconfig(salt="nacl"), "$5$rounds=1000$nacl$")
@@ -661,7 +1061,7 @@ class CryptContextTest(TestCase):
# max rounds
with catch_warnings(record=True) as wlog:
# set above handler max
- c2 = cc.replace(all__max_rounds=int(1e9)+500, all__min_rounds=None,
+ c2 = cc.copy(all__max_rounds=int(1e9)+500, all__min_rounds=None,
all__default_rounds=int(1e9)+500)
self.consumeWarningList(wlog, [PasslibConfigWarning]*2)
self.assertEqual(c2.genconfig(salt="nacl"),
@@ -693,225 +1093,119 @@ class CryptContextTest(TestCase):
self.assertEqual(cc.genconfig(salt="nacl"), '$5$rounds=2500$nacl$')
# fallback default rounds - use handler's
- c2 = cc.replace(all__default_rounds=None, all__max_rounds=50000)
- self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=40000$nacl$')
+ df = hash.sha256_crypt.default_rounds
+ c2 = cc.copy(all__default_rounds=None, all__max_rounds=df<<1)
+ self.assertEqual(c2.genconfig(salt="nacl"),
+ '$5$rounds=%d$nacl$' % df)
# fallback default rounds - use handler's, but clipped to max rounds
- c2 = cc.replace(all__default_rounds=None, all__max_rounds=3000)
+ c2 = cc.copy(all__default_rounds=None, all__max_rounds=3000)
self.assertEqual(c2.genconfig(salt="nacl"), '$5$rounds=3000$nacl$')
# TODO: test default falls back to mx / mn if handler has no default.
#default rounds - out of bounds
- self.assertRaises(ValueError, cc.replace, all__default_rounds=1999)
- cc.policy.replace(all__default_rounds=2000)
- cc.policy.replace(all__default_rounds=3000)
- self.assertRaises(ValueError, cc.replace, all__default_rounds=3001)
+ self.assertRaises(ValueError, cc.copy, all__default_rounds=1999)
+ cc.copy(all__default_rounds=2000)
+ cc.copy(all__default_rounds=3000)
+ self.assertRaises(ValueError, cc.copy, all__default_rounds=3001)
# invalid min/max bounds
- c2 = CryptContext(policy=None, schemes=["sha256_crypt"])
- self.assertRaises(ValueError, c2.replace, all__min_rounds=-1)
- self.assertRaises(ValueError, c2.replace, all__max_rounds=-1)
- self.assertRaises(ValueError, c2.replace, all__min_rounds=2000,
+ c2 = CryptContext(schemes=["sha256_crypt"])
+ self.assertRaises(ValueError, c2.copy, all__min_rounds=-1)
+ self.assertRaises(ValueError, c2.copy, all__max_rounds=-1)
+ self.assertRaises(ValueError, c2.copy, all__min_rounds=2000,
all__max_rounds=1999)
- def test_10_03_genconfig_linear_vary_rounds(self):
- "test genconfig() linear vary rounds"
- cc = CryptContext(policy=None,
- schemes=["sha256_crypt"],
+ def test_51_linear_vary_rounds(self):
+ "test linear vary rounds"
+ cc = CryptContext(schemes=["sha256_crypt"],
all__min_rounds=1995,
all__max_rounds=2005,
all__default_rounds=2000,
)
# test negative
- self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1)
- self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%")
- self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%")
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1)
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%")
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%")
# test static
- c2 = cc.replace(all__vary_rounds=0)
+ c2 = cc.copy(all__vary_rounds=0)
self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000)
- c2 = cc.replace(all__vary_rounds="0%")
+ c2 = cc.copy(all__vary_rounds="0%")
self.assert_rounds_range(c2, "sha256_crypt", 2000, 2000)
# test absolute
- c2 = cc.replace(all__vary_rounds=1)
+ c2 = cc.copy(all__vary_rounds=1)
self.assert_rounds_range(c2, "sha256_crypt", 1999, 2001)
- c2 = cc.replace(all__vary_rounds=100)
+ c2 = cc.copy(all__vary_rounds=100)
self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005)
# test relative
- c2 = cc.replace(all__vary_rounds="0.1%")
+ c2 = cc.copy(all__vary_rounds="0.1%")
self.assert_rounds_range(c2, "sha256_crypt", 1998, 2002)
- c2 = cc.replace(all__vary_rounds="100%")
+ c2 = cc.copy(all__vary_rounds="100%")
self.assert_rounds_range(c2, "sha256_crypt", 1995, 2005)
- def test_10_03_genconfig_log2_vary_rounds(self):
- "test genconfig() log2 vary rounds"
- cc = CryptContext(policy=None,
- schemes=["bcrypt"],
+ def test_52_log2_vary_rounds(self):
+ "test log2 vary rounds"
+ cc = CryptContext(schemes=["bcrypt"],
all__min_rounds=15,
all__max_rounds=25,
all__default_rounds=20,
)
# test negative
- self.assertRaises(ValueError, cc.replace, all__vary_rounds=-1)
- self.assertRaises(ValueError, cc.replace, all__vary_rounds="-1%")
- self.assertRaises(ValueError, cc.replace, all__vary_rounds="101%")
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds=-1)
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds="-1%")
+ self.assertRaises(ValueError, cc.copy, all__vary_rounds="101%")
# test static
- c2 = cc.replace(all__vary_rounds=0)
+ c2 = cc.copy(all__vary_rounds=0)
self.assert_rounds_range(c2, "bcrypt", 20, 20)
- c2 = cc.replace(all__vary_rounds="0%")
+ c2 = cc.copy(all__vary_rounds="0%")
self.assert_rounds_range(c2, "bcrypt", 20, 20)
# test absolute
- c2 = cc.replace(all__vary_rounds=1)
+ c2 = cc.copy(all__vary_rounds=1)
self.assert_rounds_range(c2, "bcrypt", 19, 21)
- c2 = cc.replace(all__vary_rounds=100)
+ c2 = cc.copy(all__vary_rounds=100)
self.assert_rounds_range(c2, "bcrypt", 15, 25)
# test relative - should shift over at 50% mark
- c2 = cc.replace(all__vary_rounds="1%")
+ c2 = cc.copy(all__vary_rounds="1%")
self.assert_rounds_range(c2, "bcrypt", 20, 20)
- c2 = cc.replace(all__vary_rounds="49%")
+ c2 = cc.copy(all__vary_rounds="49%")
self.assert_rounds_range(c2, "bcrypt", 20, 20)
- c2 = cc.replace(all__vary_rounds="50%")
+ c2 = cc.copy(all__vary_rounds="50%")
self.assert_rounds_range(c2, "bcrypt", 19, 20)
- c2 = cc.replace(all__vary_rounds="100%")
+ c2 = cc.copy(all__vary_rounds="100%")
self.assert_rounds_range(c2, "bcrypt", 15, 21)
def assert_rounds_range(self, context, scheme, lower, upper):
"helper to check vary_rounds covers specified range"
# NOTE: this runs enough times the min and max *should* be hit,
# though there's a faint chance it will randomly fail.
- handler = context.policy.get_handler(scheme)
+ handler = context.handler(scheme)
salt = handler.default_salt_chars[0:1] * handler.max_salt_size
seen = set()
for i in irange(300):
h = context.genconfig(scheme, salt=salt)
r = handler.from_string(h).rounds
seen.add(r)
- self.assertEqual(min(seen), lower, "vary_rounds lower bound:")
- self.assertEqual(max(seen), upper, "vary_rounds upper bound:")
-
- def test_11_encrypt_settings(self):
- "test encrypt() honors policy settings"
- cc = CryptContext(**self.sample_policy_1)
-
- # hash specific settings
- self.assertEqual(
- cc.encrypt("password", scheme="phpass", salt='.'*8),
- '$H$5........De04R5Egz0aq8Tf.1eVhY/',
- )
- self.assertEqual(
- cc.encrypt("password", scheme="phpass", salt='.'*8, ident="P"),
- '$P$5........De04R5Egz0aq8Tf.1eVhY/',
- )
-
- # NOTE: more thorough job of rounds limits done in genconfig() test,
- # which is much cheaper, and shares the same codebase.
-
- # min rounds
- with catch_warnings(record=True) as wlog:
- self.assertEqual(
- cc.encrypt("password", rounds=1999, salt="nacl"),
- '$5$rounds=2000$nacl$9/lTZ5nrfPuz8vphznnmHuDGFuvjSNvOEDsGmGfsS97',
- )
- self.consumeWarningList(wlog, PasslibConfigWarning)
-
- self.assertEqual(
- cc.encrypt("password", rounds=2001, salt="nacl"),
- '$5$rounds=2001$nacl$8PdeoPL4aXQnJ0woHhqgIw/efyfCKC2WHneOpnvF.31'
- )
- self.consumeWarningList(wlog)
-
- # max rounds, etc tested in genconfig()
-
- # make default > max throws error if attempted
- self.assertRaises(ValueError, cc.replace,
- sha256_crypt__default_rounds=4000)
-
- def test_12_hash_needs_update(self):
- "test hash_needs_update() method"
- cc = CryptContext(**self.sample_policy_1)
-
- #check deprecated scheme
- self.assertTrue(cc.hash_needs_update('9XXD4trGYeGJA'))
- self.assertFalse(cc.hash_needs_update('$1$J8HC2RCr$HcmM.7NxB2weSvlw2FgzU0'))
-
- #check min rounds
- self.assertTrue(cc.hash_needs_update('$5$rounds=1999$jD81UCoo.zI.UETs$Y7qSTQ6mTiU9qZB4fRr43wRgQq4V.5AAf7F97Pzxey/'))
- self.assertFalse(cc.hash_needs_update('$5$rounds=2000$228SSRje04cnNCaQ$YGV4RYu.5sNiBvorQDlO0WWQjyJVGKBcJXz3OtyQ2u8'))
-
- #check max rounds
- self.assertFalse(cc.hash_needs_update('$5$rounds=3000$fS9iazEwTKi7QPW4$VasgBC8FqlOvD7x2HhABaMXCTh9jwHclPA9j5YQdns.'))
- self.assertTrue(cc.hash_needs_update('$5$rounds=3001$QlFHHifXvpFX4PLs$/0ekt7lSs/lOikSerQ0M/1porEHxYq7W/2hdFpxA3fA'))
+ self.assertEqual(min(seen), lower, "vary_rounds had wrong lower limit:")
+ self.assertEqual(max(seen), upper, "vary_rounds had wrong upper limit:")
#=========================================================
- #identify
+ # feature tests
#=========================================================
- def test_20_basic(self):
- "test basic encrypt/identify/verify functionality"
- handlers = [hash.md5_crypt, hash.des_crypt, hash.bsdi_crypt]
- cc = CryptContext(handlers, policy=None)
-
- #run through handlers
- for crypt in handlers:
- h = cc.encrypt("test", scheme=crypt.name)
- self.assertEqual(cc.identify(h), crypt.name)
- self.assertEqual(cc.identify(h, resolve=True), crypt)
- self.assertTrue(cc.verify('test', h))
- self.assertTrue(not cc.verify('notest', h))
-
- #test default
- h = cc.encrypt("test")
- self.assertEqual(cc.identify(h), "md5_crypt")
-
- #test genhash
- h = cc.genhash('secret', cc.genconfig())
- self.assertEqual(cc.identify(h), 'md5_crypt')
-
- h = cc.genhash('secret', cc.genconfig(), scheme='md5_crypt')
- self.assertEqual(cc.identify(h), 'md5_crypt')
-
- self.assertRaises(ValueError, cc.genhash, 'secret', cc.genconfig(), scheme="des_crypt")
-
- def test_21_identify(self):
- "test identify() border cases"
- handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"]
- cc = CryptContext(handlers, policy=None)
-
- #check unknown hash
- self.assertEqual(cc.identify('$9$232323123$1287319827'), None)
- self.assertRaises(ValueError, cc.identify, '$9$232323123$1287319827', required=True)
-
- def test_22_verify(self):
- "test verify() scheme kwd"
- handlers = ["md5_crypt", "des_crypt", "bsdi_crypt"]
- cc = CryptContext(handlers, policy=None)
-
- h = hash.md5_crypt.encrypt("test")
-
- #check base verify
- self.assertTrue(cc.verify("test", h))
- self.assertTrue(not cc.verify("notest", h))
-
- #check verify using right alg
- self.assertTrue(cc.verify('test', h, scheme='md5_crypt'))
- self.assertTrue(not cc.verify('notest', h, scheme='md5_crypt'))
-
- #check verify using wrong alg
- self.assertRaises(ValueError, cc.verify, 'test', h, scheme='bsdi_crypt')
-
- def test_24_min_verify_time(self):
+ def test_60_min_verify_time(self):
"test verify() honors min_verify_time"
#NOTE: this whole test assumes time.sleep() and tick()
# have better than 100ms accuracy - set via delta.
@@ -967,111 +1261,10 @@ class CryptContextTest(TestCase):
self.assertAlmostEqual(elapsed, max_delay, delta=delta)
self.consumeWarningList(wlog, ".*verify exceeded min_verify_time")
- def test_25_verify_and_update(self):
- "test verify_and_update()"
- cc = CryptContext(**self.sample_policy_1)
-
- #create some hashes
- h1 = cc.encrypt("password", scheme="des_crypt")
- h2 = cc.encrypt("password", scheme="sha256_crypt")
-
- #check bad password, deprecated hash
- ok, new_hash = cc.verify_and_update("wrongpass", h1)
- self.assertFalse(ok)
- self.assertIs(new_hash, None)
-
- #check bad password, good hash
- ok, new_hash = cc.verify_and_update("wrongpass", h2)
- self.assertFalse(ok)
- self.assertIs(new_hash, None)
-
- #check right password, deprecated hash
- ok, new_hash = cc.verify_and_update("password", h1)
- self.assertTrue(ok)
- self.assertTrue(cc.identify(new_hash), "sha256_crypt")
-
- #check right password, good hash
- ok, new_hash = cc.verify_and_update("password", h2)
- self.assertTrue(ok)
- self.assertIs(new_hash, None)
-
- #=========================================================
- # border cases
- #=========================================================
- def test_30_nonstring_hash(self):
- "test non-string hash values cause error"
- #
- # test hash=None or some other non-string causes TypeError
- # and that explicit-scheme code path behaves the same.
- #
- cc = CryptContext(["des_crypt"])
- for hash, kwds in [
- (None, {}),
- (None, {"scheme": "des_crypt"}),
- (1, {}),
- ((), {}),
- ]:
-
- self.assertRaises(TypeError, cc.identify, hash, **kwds)
- self.assertRaises(TypeError, cc.genhash, 'stub', hash, **kwds)
- self.assertRaises(TypeError, cc.verify, 'stub', hash, **kwds)
- self.assertRaises(TypeError, cc.verify_and_update, 'stub', hash, **kwds)
- self.assertRaises(TypeError, cc.hash_needs_update, hash, **kwds)
-
- #
- # but genhash *should* accept None if default scheme lacks config string.
- #
- cc2 = CryptContext(["mysql323"])
- self.assertRaises(TypeError, cc2.identify, None)
- self.assertIsInstance(cc2.genhash("stub", None), str)
- self.assertRaises(TypeError, cc2.verify, 'stub', None)
- self.assertRaises(TypeError, cc2.verify_and_update, 'stub', None)
- self.assertRaises(TypeError, cc2.hash_needs_update, None)
-
-
- def test_31_nonstring_secret(self):
- "test non-string password values cause error"
- cc = CryptContext(["des_crypt"])
- hash = cc.encrypt("stub")
- #
- # test secret=None, or some other non-string causes TypeError
- #
- for secret, kwds in [
- (None, {}),
- (None, {"scheme": "des_crypt"}),
- (1, {}),
- ((), {}),
- ]:
- self.assertRaises(TypeError, cc.encrypt, secret, **kwds)
- self.assertRaises(TypeError, cc.genhash, secret, hash, **kwds)
- self.assertRaises(TypeError, cc.verify, secret, hash, **kwds)
- self.assertRaises(TypeError, cc.verify_and_update, secret, hash, **kwds)
-
- #=========================================================
- # other
- #=========================================================
- def test_90_bcrypt_normhash(self):
- "teset verify_and_update / hash_needs_update corrects bcrypt padding"
- # see issue 25.
- bcrypt = hash.bcrypt
-
- PASS1 = "loppux"
- BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C"
- GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C"
- ctx = CryptContext(["bcrypt"])
-
- with catch_warnings(record=True) as wlog:
- self.assertTrue(ctx.hash_needs_update(BAD1))
- self.assertFalse(ctx.hash_needs_update(GOOD1))
-
- if bcrypt.has_backend():
- self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None))
- self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None))
- res = ctx.verify_and_update(PASS1, BAD1)
- self.assertTrue(res[0] and res[1] and res[1] != BAD1)
-
- def test_91_passprep(self):
+ def test_61_passprep(self):
"test passprep option"
+ self.require_stringprep()
+
# saslprep should normalize pu -> pn
pu = u("a\u0300") # unnormalized unicode
pn = u("\u00E0") # normalized unicode
@@ -1126,8 +1319,45 @@ class CryptContextTest(TestCase):
self.assertFalse(ctx.verify(pu, ctx.encrypt(pn, scheme="md5_crypt")))
self.assertTrue(ctx.verify(pu, ctx.encrypt(pn, scheme="sha256_crypt")))
+ def test_62_bcrypt_update(self):
+ "test verify_and_update / hash_needs_update corrects bcrypt padding"
+ # see issue 25.
+ bcrypt = hash.bcrypt
+
+ PASS1 = "loppux"
+ BAD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXORm4GCUMqp7meTnkft4zgSnrbhoKdDV0C"
+ GOOD1 = "$2a$12$oaQbBqq8JnSM1NHRPQGXOOm4GCUMqp7meTnkft4zgSnrbhoKdDV0C"
+ ctx = CryptContext(["bcrypt"])
+
+ with catch_warnings(record=True) as wlog:
+ self.assertTrue(ctx.hash_needs_update(BAD1))
+ self.assertFalse(ctx.hash_needs_update(GOOD1))
+
+ if bcrypt.has_backend():
+ self.assertEqual(ctx.verify_and_update(PASS1,GOOD1), (True,None))
+ self.assertEqual(ctx.verify_and_update("x",BAD1), (False,None))
+ ok, new_hash = ctx.verify_and_update(PASS1, BAD1)
+ self.assertTrue(ok)
+ self.assertTrue(new_hash and new_hash != BAD1)
+
+ def test_63_bsdi_crypt_update(self):
+ "test verify_and_update / hash_needs_update correct bsdi even rounds"
+ even_hash = '_Y/../cG0zkJa6LY6k4c'
+ odd_hash = '_Z/..TgFg0/ptQtpAgws'
+ secret = 'test'
+ ctx = CryptContext(['bsdi_crypt'])
+
+ self.assertTrue(ctx.hash_needs_update(even_hash))
+ self.assertFalse(ctx.hash_needs_update(odd_hash))
+
+ self.assertEqual(ctx.verify_and_update(secret, odd_hash), (True,None))
+ self.assertEqual(ctx.verify_and_update("x", even_hash), (False,None))
+ ok, new_hash = ctx.verify_and_update(secret, even_hash)
+ self.assertTrue(ok)
+ self.assertTrue(new_hash and new_hash != even_hash)
+
#=========================================================
- #eoc
+ # eoc
#=========================================================
#=========================================================
@@ -1153,44 +1383,25 @@ class LazyCryptContextTest(TestCase):
self.assertFalse(has_crypt_handler("dummy_2", True))
- self.assertTrue(cc.policy.handler_is_deprecated("des_crypt"))
- self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"])
+ self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt"))
+ self.assertTrue(cc._is_deprecated_scheme("des_crypt"))
self.assertTrue(has_crypt_handler("dummy_2", True))
def test_callable_constructor(self):
- "test create_policy() hook, returning CryptPolicy"
- self.assertFalse(has_crypt_handler("dummy_2"))
- register_crypt_handler_path("dummy_2", "passlib.tests.test_context")
-
- def create_policy(flag=False):
- self.assertTrue(flag)
- return CryptPolicy(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"])
-
- cc = LazyCryptContext(create_policy=create_policy, flag=True)
-
- self.assertFalse(has_crypt_handler("dummy_2", True))
-
- self.assertTrue(cc.policy.handler_is_deprecated("des_crypt"))
- self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"])
-
- self.assertTrue(has_crypt_handler("dummy_2", True))
-
- def test_callable_constructor2(self):
- "test create_policy() hook, returning dict"
self.assertFalse(has_crypt_handler("dummy_2"))
register_crypt_handler_path("dummy_2", "passlib.tests.test_context")
- def create_policy(flag=False):
+ def onload(flag=False):
self.assertTrue(flag)
return dict(schemes=iter(["dummy_2", "des_crypt"]), deprecated=["des_crypt"])
- cc = LazyCryptContext(create_policy=create_policy, flag=True)
+ cc = LazyCryptContext(onload=onload, flag=True)
self.assertFalse(has_crypt_handler("dummy_2", True))
- self.assertTrue(cc.policy.handler_is_deprecated("des_crypt"))
- self.assertEqual(cc.policy.schemes(), ["dummy_2", "des_crypt"])
+ self.assertEqual(cc.schemes(), ("dummy_2", "des_crypt"))
+ self.assertTrue(cc._is_deprecated_scheme("des_crypt"))
self.assertTrue(has_crypt_handler("dummy_2", True))