From 55932d7ecd02f29fbcba90418db574e6fe198b22 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 8 Jul 2016 15:26:18 +0200 Subject: s4:dsdb/tests: improve the RestoreUserObjectTestCase test We verify attributes, values and their replication metadata after each step (add, delete, reanimate). Signed-off-by: Stefan Metzmacher Reviewed-by: Andrew Bartlett --- source4/dsdb/tests/python/tombstone_reanimation.py | 199 ++++++++++++++++++++- 1 file changed, 192 insertions(+), 7 deletions(-) diff --git a/source4/dsdb/tests/python/tombstone_reanimation.py b/source4/dsdb/tests/python/tombstone_reanimation.py index 3fcdca3854d..cabde6ec357 100755 --- a/source4/dsdb/tests/python/tombstone_reanimation.py +++ b/source4/dsdb/tests/python/tombstone_reanimation.py @@ -24,6 +24,12 @@ import unittest sys.path.insert(0, "bin/python") import samba +from samba.ndr import ndr_unpack, ndr_print +from samba.dcerpc import misc +from samba.dcerpc import security +from samba.dcerpc import drsblobs +from samba.dcerpc.drsuapi import * + import samba.tests from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message, MessageElement, LdbError, @@ -61,9 +67,10 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase): def GUID_string(self, guid): return self.samdb.schema_format_value("objectGUID", guid) - def search_guid(self, guid): + def search_guid(self, guid, attrs=["*"]): res = self.samdb.search(base="" % self.GUID_string(guid), - scope=SCOPE_BASE, controls=["show_deleted:1"]) + scope=SCOPE_BASE, attrs=attrs, + controls=["show_deleted:1"]) self.assertEquals(len(res), 1) return res[0] @@ -133,6 +140,39 @@ class RestoredObjectAttributesBaseTestCase(samba.tests.TestCase): "Unexpected value (%s) for '%s', expected (%s)" % ( str(actual_val), name, expected_val)) + def _check_metadata(self, metadata, expected): + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, str(metadata[0])) + + repl_array = [] + for o in repl.ctr.array: + repl_array.append((o.attid, o.version)) + repl_set = set(repl_array) + + expected_set = set(expected) + self.assertEqual(len(repl_set), len(expected), + "Unexpected metadata, missing from expected (%s), extra (%s)), repl: \n%s" % ( + str(expected_set.difference(repl_set)), + str(repl_set.difference(expected_set)), + ndr_print(repl))) + + i = 0 + for o in repl.ctr.array: + e = expected[i] + (attid, version) = e + self.assertEquals(attid, o.attid, + "(LDAP) Wrong attid " + "for expected value %d, wanted 0x%08x got 0x%08x, " + "repl: \n%s" + % (i, attid, o.attid, ndr_print(repl))) + # Allow version to be skipped when it does not matter + if version is not None: + self.assertEquals(o.version, version, + "(LDAP) Wrong version for expected value %d, " + "attid 0x%08x, " + "wanted %d got %d, repl: \n%s" + % (i, o.attid, + version, o.version, ndr_print(repl))) + i = i + 1 @staticmethod def restore_deleted_object(samdb, del_dn, new_dn, new_attrs=None): @@ -314,7 +354,7 @@ class BaseRestoreObjectTestCase(RestoredObjectAttributesBaseTestCase): class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): """Test cases for delete/reanimate user objects""" - def _expected_user_attributes(self, username, user_dn, category): + def _expected_user_add_attributes(self, username, user_dn, category): return {'dn': user_dn, 'objectClass': '**', 'cn': username, @@ -335,17 +375,149 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): 'lastLogoff': '0', 'pwdLastSet': '0', 'primaryGroupID': '513', - 'operatorCount': '0', 'objectSid': '**', - 'adminCount': '0', 'accountExpires': '9223372036854775807', 'logonCount': '0', 'sAMAccountName': username, 'sAMAccountType': '805306368', + 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) + } + + def _expected_user_add_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 1), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 1), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 1), + (DRSUAPI_ATTID_countryCode, 1), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 1), + (DRSUAPI_ATTID_primaryGroupID, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 1), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 1), + (DRSUAPI_ATTID_objectCategory, 1)] + + def _expected_user_del_attributes(self, username, _guid, _sid): + guid = ndr_unpack(misc.GUID, _guid) + dn = "CN=%s\\0ADEL:%s,CN=Deleted Objects,%s" % (username, guid, self.base_dn) + cn = "%s\nDEL:%s" % (username, guid) + return {'dn': dn, + 'objectClass': '**', + 'cn': cn, + 'distinguishedName': dn, + 'isDeleted': 'TRUE', + 'isRecycled': 'TRUE', + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': cn, + 'objectGUID': _guid, + 'userAccountControl': '546', + 'objectSid': _sid, + 'sAMAccountName': username, + 'lastKnownParent': 'CN=Users,%s' % self.base_dn, + } + + def _expected_user_del_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 2), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 1), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 2), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 2), + (DRSUAPI_ATTID_countryCode, 2), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 2), + (DRSUAPI_ATTID_primaryGroupID, 2), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_accountExpires, 2), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 2), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 2), + (DRSUAPI_ATTID_isRecycled, 1)] + + def _expected_user_restore_attributes(self, username, guid, sid, user_dn, category): + return {'dn': user_dn, + 'objectClass': '**', + 'cn': username, + 'distinguishedName': user_dn, + 'instanceType': '4', + 'whenCreated': '**', + 'whenChanged': '**', + 'uSNCreated': '**', + 'uSNChanged': '**', + 'name': username, + 'objectGUID': guid, + 'userAccountControl': '546', + 'badPwdCount': '0', + 'badPasswordTime': '0', + 'codePage': '0', + 'countryCode': '0', + 'lastLogon': '0', + 'lastLogoff': '0', + 'pwdLastSet': '0', + 'primaryGroupID': '513', + 'operatorCount': '0', + 'objectSid': sid, + 'adminCount': '0', + 'accountExpires': '0', + 'logonCount': '0', + 'sAMAccountName': username, + 'sAMAccountType': '805306368', 'lastKnownParent': 'CN=Users,%s' % self.base_dn, 'objectCategory': 'CN=%s,%s' % (category, self.schema_dn) } + def _expected_user_restore_metadata(self): + return [ + (DRSUAPI_ATTID_objectClass, 1), + (DRSUAPI_ATTID_cn, 3), + (DRSUAPI_ATTID_instanceType, 1), + (DRSUAPI_ATTID_whenCreated, 1), + (DRSUAPI_ATTID_isDeleted, 2), + (DRSUAPI_ATTID_ntSecurityDescriptor, 1), + (DRSUAPI_ATTID_name, 3), + (DRSUAPI_ATTID_userAccountControl, None), + (DRSUAPI_ATTID_codePage, 3), + (DRSUAPI_ATTID_countryCode, 3), + (DRSUAPI_ATTID_dBCSPwd, 1), + (DRSUAPI_ATTID_logonHours, 1), + (DRSUAPI_ATTID_unicodePwd, 1), + (DRSUAPI_ATTID_ntPwdHistory, 1), + (DRSUAPI_ATTID_pwdLastSet, 3), + (DRSUAPI_ATTID_primaryGroupID, 3), + (DRSUAPI_ATTID_operatorCount, 1), + (DRSUAPI_ATTID_objectSid, 1), + (DRSUAPI_ATTID_adminCount, 1), + (DRSUAPI_ATTID_accountExpires, 3), + (DRSUAPI_ATTID_lmPwdHistory, 1), + (DRSUAPI_ATTID_sAMAccountName, 1), + (DRSUAPI_ATTID_sAMAccountType, 3), + (DRSUAPI_ATTID_lastKnownParent, 1), + (DRSUAPI_ATTID_objectCategory, 3), + (DRSUAPI_ATTID_isRecycled, 2)] + def test_restore_user(self): print "Test restored user attributes" username = "restore_user" @@ -357,18 +529,31 @@ class RestoreUserObjectTestCase(RestoredObjectAttributesBaseTestCase): "sAMAccountName": username}) obj = self.search_dn(usr_dn) guid = obj["objectGUID"][0] + sid = obj["objectSID"][0] + obj_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + self.assertAttributesExists(self._expected_user_add_attributes(username, usr_dn, "Person"), obj) + self._check_metadata(obj_rmd["replPropertyMetaData"], + self._expected_user_add_metadata()) self.samdb.delete(usr_dn) obj_del = self.search_guid(guid) + obj_del_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) + orig_attrs = set(obj.keys()) + del_attrs = set(obj_del.keys()) + self.assertAttributesExists(self._expected_user_del_attributes(username, guid, sid), obj_del) + self._check_metadata(obj_del_rmd["replPropertyMetaData"], + self._expected_user_del_metadata()) # restore the user and fetch what's restored self.restore_deleted_object(self.samdb, obj_del.dn, usr_dn) obj_restore = self.search_guid(guid) + obj_restore_rmd = self.search_guid(guid, attrs=["replPropertyMetaData"]) # check original attributes and restored one are same orig_attrs = set(obj.keys()) # windows restore more attributes that originally we have orig_attrs.update(['adminCount', 'operatorCount', 'lastKnownParent']) rest_attrs = set(obj_restore.keys()) - self.assertAttributesEqual(obj, orig_attrs, obj_restore, rest_attrs) - self.assertAttributesExists(self._expected_user_attributes(username, usr_dn, "Person"), obj_restore) + self.assertAttributesExists(self._expected_user_restore_attributes(username, guid, sid, usr_dn, "Person"), obj_restore) + self._check_metadata(obj_restore_rmd["replPropertyMetaData"], + self._expected_user_restore_metadata()) class RestoreGroupObjectTestCase(RestoredObjectAttributesBaseTestCase): -- cgit v1.2.1