summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2017-03-02 16:41:20 +0100
committerAndrew Bartlett <abartlet@samba.org>2017-03-03 08:59:16 +0100
commitfdacca53bdc0d1021e4b32fee1f8eb7ed4345ee5 (patch)
tree4011f5f74eb37052ee8a3933be0c62bb8ffd84e9
parent81ccdad9d045a7a6d6a569d1685bb0bf4e64d12a (diff)
downloadsamba-fdacca53bdc0d1021e4b32fee1f8eb7ed4345ee5.tar.gz
dsdb/tests: add test_ldap_bind_must_change_pwd()
This tests the error messages for failing LDAP Bind responses. BUG: https://bugzilla.samba.org/show_bug.cgi?id=9048 Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
-rwxr-xr-xsource4/dsdb/tests/python/sam.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/sam.py b/source4/dsdb/tests/python/sam.py
index f57454b7725..10775d331fe 100755
--- a/source4/dsdb/tests/python/sam.py
+++ b/source4/dsdb/tests/python/sam.py
@@ -13,6 +13,7 @@ from samba.tests.subunitrun import SubunitOptions, TestProgram
import samba.getopt as options
+from samba.credentials import Credentials, DONT_USE_KERBEROS
from samba.auth import system_session
from ldb import SCOPE_BASE, LdbError
from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
@@ -22,6 +23,8 @@ from ldb import ERR_OBJECT_CLASS_VIOLATION
from ldb import ERR_CONSTRAINT_VIOLATION
from ldb import ERR_UNDEFINED_ATTRIBUTE_TYPE
from ldb import ERR_INSUFFICIENT_ACCESS_RIGHTS
+from ldb import ERR_INVALID_CREDENTIALS
+from ldb import ERR_STRONG_AUTH_REQUIRED
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
from samba.samdb import SamDB
@@ -47,6 +50,7 @@ from samba.dcerpc import drsuapi
from samba.dcerpc import security
from samba.tests import delete_force
from samba import gensec
+from samba import werror
parser = optparse.OptionParser("sam.py [options] <host>")
sambaopts = options.SambaOptions(parser)
@@ -1626,6 +1630,179 @@ class SamTests(samba.tests.TestCase):
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ def test_ldap_bind_must_change_pwd(self):
+ """Test the error messages for failing LDAP binds"""
+ print "Test the error messages for failing LDAP binds\n"
+
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+
+ def format_error_msg(hresult_v, dsid_v, werror_v):
+ #
+ # There are 4 lower case hex digits following 'v' at the end,
+ # but different Windows Versions return different values:
+ #
+ # Windows 2008R2 uses 'v1db1'
+ # Windows 2012R2 uses 'v2580'
+ #
+ return "%08X: LdapErr: DSID-%08X, comment: AcceptSecurityContext error, data %x, v" % (
+ hresult_v, dsid_v, werror_v)
+
+ HRES_SEC_E_LOGON_DENIED = 0x8009030C
+ HRES_SEC_E_INVALID_TOKEN = 0x80090308
+
+ sasl_bind_dsid = 0x0C0904DC
+ simple_bind_dsid = 0x0C0903A9
+
+ error_msg_sasl_wrong_pw = format_error_msg(
+ HRES_SEC_E_LOGON_DENIED,
+ sasl_bind_dsid,
+ werror.WERR_LOGON_FAILURE)
+ error_msg_sasl_must_change = format_error_msg(
+ HRES_SEC_E_LOGON_DENIED,
+ sasl_bind_dsid,
+ werror.WERR_PASSWORD_MUST_CHANGE)
+ error_msg_simple_wrong_pw = format_error_msg(
+ HRES_SEC_E_INVALID_TOKEN,
+ simple_bind_dsid,
+ werror.WERR_LOGON_FAILURE)
+ error_msg_simple_must_change = format_error_msg(
+ HRES_SEC_E_INVALID_TOKEN,
+ simple_bind_dsid,
+ werror.WERR_PASSWORD_MUST_CHANGE)
+
+ username = "ldaptestuser"
+ password = "thatsAcomplPASS2"
+ utf16pw = unicode('"' + password.encode('utf-8') + '"', 'utf-8').encode('utf-16-le')
+
+ ldb.add({
+ "dn": "cn=ldaptestuser,cn=users," + self.base_dn,
+ "objectclass": "user",
+ "sAMAccountName": username,
+ "userAccountControl": str(UF_NORMAL_ACCOUNT),
+ "unicodePwd": utf16pw,
+ })
+
+ res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+ scope=SCOPE_BASE,
+ attrs=["sAMAccountName", "sAMAccountType", "userAccountControl", "pwdLastSet"])
+ self.assertTrue(len(res1) == 1)
+ self.assertEqual(res1[0]["sAMAccountName"][0], username)
+ self.assertEqual(int(res1[0]["sAMAccountType"][0]), ATYPE_NORMAL_ACCOUNT)
+ self.assertEqual(int(res1[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT)
+ self.assertNotEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+ # Open a second LDB connection with the user credentials. Use the
+ # command line credentials for informations like the domain, the realm
+ # and the workstation.
+ sasl_creds = Credentials()
+ sasl_creds.set_username(username)
+ sasl_creds.set_password(password)
+ sasl_creds.set_domain(creds.get_domain())
+ sasl_creds.set_workstation(creds.get_workstation())
+ sasl_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ sasl_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ sasl_wrong_creds = Credentials()
+ sasl_wrong_creds.set_username(username)
+ sasl_wrong_creds.set_password("wrong")
+ sasl_wrong_creds.set_domain(creds.get_domain())
+ sasl_wrong_creds.set_workstation(creds.get_workstation())
+ sasl_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ sasl_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ simple_creds = Credentials()
+ simple_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+ simple_creds.set_username(username)
+ simple_creds.set_password(password)
+ simple_creds.set_domain(creds.get_domain())
+ simple_creds.set_workstation(creds.get_workstation())
+ simple_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ simple_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ simple_wrong_creds = Credentials()
+ simple_wrong_creds.set_bind_dn("cn=ldaptestuser,cn=users," + self.base_dn)
+ simple_wrong_creds.set_username(username)
+ simple_wrong_creds.set_password("wrong")
+ simple_wrong_creds.set_domain(creds.get_domain())
+ simple_wrong_creds.set_workstation(creds.get_workstation())
+ simple_wrong_creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
+ simple_wrong_creds.set_kerberos_state(DONT_USE_KERBEROS)
+
+ sasl_ldb = SamDB(url=host, credentials=sasl_creds, lp=lp)
+ self.assertIsNotNone(sasl_ldb)
+ sasl_ldb = None
+
+ requires_strong_auth = False
+ try:
+ simple_ldb = SamDB(url=host, credentials=simple_creds, lp=lp)
+ self.assertIsNotNone(simple_ldb)
+ simple_ldb = None
+ except LdbError, (num, msg):
+ if num != ERR_STRONG_AUTH_REQUIRED:
+ raise
+ requires_strong_auth = True
+
+ def assertLDAPErrorMsg(msg, expected_msg):
+ self.assertTrue(expected_msg in msg,
+ "msg[%s] does not contain expected[%s]" % (
+ msg, expected_msg))
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ self.assertTrue(error_msg_sasl_wrong_pw in msg)
+
+ if not requires_strong_auth:
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+ m = Message()
+ m.dn = Dn(ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
+ m["pls1"] = MessageElement(str(0),
+ FLAG_MOD_REPLACE,
+ "pwdLastSet")
+ ldb.modify(m)
+
+ res1 = ldb.search("cn=ldaptestuser,cn=users," + self.base_dn,
+ scope=SCOPE_BASE, attrs=["pwdLastSet"])
+ self.assertEqual(int(res1[0]["pwdLastSet"][0]), 0)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_sasl_wrong_pw)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=sasl_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_sasl_must_change)
+
+ if not requires_strong_auth:
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_wrong_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_wrong_pw)
+
+ try:
+ ldb_fail = SamDB(url=host, credentials=simple_creds, lp=lp)
+ self.fail()
+ except LdbError, (num, msg):
+ self.assertEquals(num, ERR_INVALID_CREDENTIALS)
+ assertLDAPErrorMsg(msg, error_msg_simple_must_change)
+
+ delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
def test_userAccountControl(self):
"""Test the userAccountControl behaviour"""