diff options
author | Stefan Metzmacher <metze@samba.org> | 2017-03-02 16:41:20 +0100 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2017-03-03 08:59:16 +0100 |
commit | fdacca53bdc0d1021e4b32fee1f8eb7ed4345ee5 (patch) | |
tree | 4011f5f74eb37052ee8a3933be0c62bb8ffd84e9 | |
parent | 81ccdad9d045a7a6d6a569d1685bb0bf4e64d12a (diff) | |
download | samba-fdacca53bdc0d1021e4b32fee1f8eb7ed4345ee5.tar.gz |
dsdb/tests: add test_ldap_bind_must_change_pwd()
This tests the error messages for failing LDAP Bind responses.
BUG: https://bugzilla.samba.org/show_bug.cgi?id=9048
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
-rwxr-xr-x | source4/dsdb/tests/python/sam.py | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/sam.py b/source4/dsdb/tests/python/sam.py index f57454b7725..10775d331fe 100755 --- a/source4/dsdb/tests/python/sam.py +++ b/source4/dsdb/tests/python/sam.py @@ -13,6 +13,7 @@ from samba.tests.subunitrun import SubunitOptions, TestProgram import samba.getopt as options +from samba.credentials import Credentials, DONT_USE_KERBEROS from samba.auth import system_session from ldb import SCOPE_BASE, LdbError from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS @@ -22,6 +23,8 @@ from ldb import ERR_OBJECT_CLASS_VIOLATION from ldb import ERR_CONSTRAINT_VIOLATION from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS +from ldb import ERR_INVALID_CREDENTIALS +from ldb import ERR_STRONG_AUTH_REQUIRED from ldb import Message, MessageElement, Dn from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE from samba.samdb import SamDB @@ -47,6 +50,7 @@ from samba.dcerpc import drsuapi from samba.dcerpc import security from samba.tests import delete_force from samba import gensec +from samba import werror parser = optparse.OptionParser("sam.py [options] <host>") sambaopts = options.SambaOptions(parser) @@ -1626,6 +1630,179 @@ class SamTests(samba.tests.TestCase): delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + def test_ldap_bind_must_change_pwd(self): + """Test the error messages for failing LDAP binds""" + print "Test the error messages for failing LDAP binds\n" + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + + def format_error_msg(hresult_v, dsid_v, werror_v): + # + # There are 4 lower case hex digits following 'v' at the end, + # but different Windows Versions return different values: + # + # Windows 2008R2 uses 'v1db1' + # Windows 2012R2 uses 'v2580' + # + return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % ( + hresult_v, dsid_v, werror_v) + + HRES_SEC_E_LOGON_DENIED = 0x8009030C + HRES_SEC_E_INVALID_TOKEN = 0x80090308 + + sasl_bind_dsid = 0x0C0904DC + simple_bind_dsid = 0x0C0903A9 + + error_msg_sasl_wrong_pw = format_error_msg( + HRES_SEC_E_LOGON_DENIED, + sasl_bind_dsid, + werror.WERR_LOGON_FAILURE) + error_msg_sasl_must_change = format_error_msg( + HRES_SEC_E_LOGON_DENIED, + sasl_bind_dsid, + werror.WERR_PASSWORD_MUST_CHANGE) + error_msg_simple_wrong_pw = format_error_msg( + HRES_SEC_E_INVALID_TOKEN, + simple_bind_dsid, + werror.WERR_LOGON_FAILURE) + error_msg_simple_must_change = format_error_msg( + HRES_SEC_E_INVALID_TOKEN, + simple_bind_dsid, + werror.WERR_PASSWORD_MUST_CHANGE) + + username = "ldaptestuser" + password = "thatsAcomplPASS2" + utf16pw = unicode('"' + password.encode('utf-8') + '"', 'utf-8').encode('utf-16-le') + + ldb.add({ + "dn": "cn=ldaptestuser,cn=users," + self.base_dn, + "objectclass": "user", + "sAMAccountName": username, + "userAccountControl": str(UF_NORMAL_ACCOUNT), + "unicodePwd": utf16pw, + }) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, + attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"]) + self.assertTrue(len(res1) == 1) + self.assertEqual(res1[0]["sAMAccountName"][0], username) + self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT) + self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT) + self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0) + + # Open a second LDB connection with the user credentials. Use the + # command line credentials for informations like the domain, the realm + # and the workstation. + sasl_creds = Credentials() + sasl_creds.set_username(username) + sasl_creds.set_password(password) + sasl_creds.set_domain(creds.get_domain()) + sasl_creds.set_workstation(creds.get_workstation()) + sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + sasl_creds.set_kerberos_state(DONT_USE_KERBEROS) + + sasl_wrong_creds = Credentials() + sasl_wrong_creds.set_username(username) + sasl_wrong_creds.set_password("wrong") + sasl_wrong_creds.set_domain(creds.get_domain()) + sasl_wrong_creds.set_workstation(creds.get_workstation()) + sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) + + simple_creds = Credentials() + simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) + simple_creds.set_username(username) + simple_creds.set_password(password) + simple_creds.set_domain(creds.get_domain()) + simple_creds.set_workstation(creds.get_workstation()) + simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + simple_creds.set_kerberos_state(DONT_USE_KERBEROS) + + simple_wrong_creds = Credentials() + simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn) + simple_wrong_creds.set_username(username) + simple_wrong_creds.set_password("wrong") + simple_wrong_creds.set_domain(creds.get_domain()) + simple_wrong_creds.set_workstation(creds.get_workstation()) + simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL) + simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS) + + sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp) + self.assertIsNotNone(sasl_ldb) + sasl_ldb = None + + requires_strong_auth = False + try: + simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp) + self.assertIsNotNone(simple_ldb) + simple_ldb = None + except LdbError, (num, msg): + if num != ERR_STRONG_AUTH_REQUIRED: + raise + requires_strong_auth = True + + def assertLDAPErrorMsg(msg, expected_msg): + self.assertTrue(expected_msg in msg, + "msg[%s] does not contain expected[%s]" % ( + msg, expected_msg)) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + self.assertTrue(error_msg_sasl_wrong_pw in msg) + + if not requires_strong_auth: + try: + ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) + + m = Message() + m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn) + m["pls1"] = MessageElement(str(0), + FLAG_MOD_REPLACE, + "pwdLastSet") + ldb.modify(m) + + res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn, + scope=SCOPE_BASE, attrs=["pwdLastSet"]) + self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw) + + try: + ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_sasl_must_change) + + if not requires_strong_auth: + try: + ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw) + + try: + ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp) + self.fail() + except LdbError, (num, msg): + self.assertEquals(num, ERR_INVALID_CREDENTIALS) + assertLDAPErrorMsg(msg, error_msg_simple_must_change) + + delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn) def test_userAccountControl(self): """Test the userAccountControl behaviour""" |