diff options
author | Andrew Bartlett <abartlet@samba.org> | 2016-05-10 16:40:51 +1200 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2016-06-07 10:28:10 +0200 |
commit | 9dcc62eb781fdc05070bc12c43081e086963e4ad (patch) | |
tree | 440adf9de393be509abf9813f060ea10514c246f | |
parent | 5fee4aa90787237d7a7af11febdc45277a896976 (diff) | |
download | samba-9dcc62eb781fdc05070bc12c43081e086963e4ad.tar.gz |
selftest: Add more tests to cover attribute changes vs DN renames
This covers a bug where unrelated attribute changes would reverse a rename
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz
-rw-r--r-- | source4/torture/drs/python/repl_move.py | 660 |
1 files changed, 645 insertions, 15 deletions
diff --git a/source4/torture/drs/python/repl_move.py b/source4/torture/drs/python/repl_move.py index 06cfc02278f..5ccf108195e 100644 --- a/source4/torture/drs/python/repl_move.py +++ b/source4/torture/drs/python/repl_move.py @@ -31,6 +31,7 @@ import time from ldb import ( + SCOPE_BASE, SCOPE_SUBTREE, ) @@ -68,7 +69,12 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) def tearDown(self): - self.ldb_dc1.delete(self.ou1_dn, ["tree_delete:1"]) + try: + self.ldb_dc1.delete(self.ou1_dn, ["tree_delete:1"]) + except ldb.LdbError as (enum, string): + if enum == ldb.ERR_NO_SUCH_OBJECT: + pass + self.ldb_dc1.delete(self.ou2_dn, ["tree_delete:1"]) self._enable_inbound_repl(self.dnsname_dc1) self._enable_inbound_repl(self.dnsname_dc2) @@ -86,18 +92,26 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): attrs=["*", "parentGUID"]) self.assertEquals(len(res), 1) user_cur = res[0] - # now check properties of the user - cn_orig = obj_orig["cn"][0] - cn_cur = user_cur["cn"][0] + rdn_orig = obj_orig[user_cur.dn.get_rdn_name()][0] + rdn_cur = user_cur[user_cur.dn.get_rdn_name()][0] name_orig = obj_orig["name"][0] name_cur = user_cur["name"][0] dn_orig = obj_orig["dn"] dn_cur = user_cur["dn"] - self.assertFalse("isDeleted" in user_cur) - self.assertEquals(cn_cur, cn_orig) - self.assertEquals(name_cur, name_orig) - self.assertEquals(dn_cur, dn_orig) - self.assertEqual(name_cur, cn_cur) + # now check properties of the user + if is_deleted: + self.assertTrue("isDeleted" in user_cur) + self.assertEquals(rdn_cur.split('\n')[0], rdn_orig) + self.assertEquals(name_cur.split('\n')[0], name_orig) + self.assertEquals(dn_cur.get_rdn_value().split('\n')[0], + dn_orig.get_rdn_value()) + self.assertEqual(name_cur, rdn_cur) + else: + self.assertFalse("isDeleted" in user_cur) + self.assertEquals(rdn_cur, rdn_orig) + self.assertEquals(name_cur, name_orig) + self.assertEquals(dn_cur, dn_orig) + self.assertEqual(name_cur, rdn_cur) self.assertEqual(name_cur, user_cur.dn.get_rdn_value()) return user_cur @@ -110,6 +124,10 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): - the OU is renamed - We verify that after replication, that the user has the correct DN (under OU2) + - the OU is deleted + - the OU is modified on DC2 + - We verify that after replication, + that the user has the correct DN (deleted) and has not description """ # work-out unique username to test with @@ -148,10 +166,27 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): # delete user on DC1 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = new_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + def test_ReplicateMoveObject2(self): """Verifies how a moved container with a user inside is not @@ -199,6 +234,570 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase): # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True) + + # trigger replication from DC2 to DC1, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True) + + + def test_ReplicateMoveObject3(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is created on DC1 + - the OU is renamed on DC1 + - We verify that after replication, + that the user has the correct DN (under OU2). + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # trigger replication from DC2 (Which has never seen the object) to DC1 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC1 - should be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be delted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + + + def test_ReplicateMoveObject4(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + def test_ReplicateMoveObject5(self): + """Verifies how a moved container with a user inside is replicated between two DCs. + This test should verify that: + - the OU is replicated properly + - the user is modified on DC2 + - the OU is renamed on DC1 + - We verify that after replication DC2 -> DC1, + that the user has the correct DN (under OU2), and the description + + """ + # work-out unique username to test with + username = self._make_username() + + # create user on DC1 + self.ldb_dc1.newuser(username=username, + userou="ou=%s" % self.ou1_dn.get_component_value(0), + password=None, setpassword=False) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + user_orig = ldb_res[0] + user_dn = ldb_res[0]["dn"] + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + # check user info on DC1 + print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(user_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=self.ou2_dn, + scope=SCOPE_SUBTREE, + expression="(samAccountName=%s)" % username) + self.assertEquals(len(ldb_res), 1) + + user_moved_orig = ldb_res[0] + user_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = user_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) + + # delete user on DC2 + self.ldb_dc2.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + + + def test_ReplicateMoveObject6(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject7(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - We verify that after replication DC2 -> DC1, + that the OU1 has the correct DN (under OU2), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou2_dn) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject8(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject9(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is renamed on DC1 to be under OU2 + - the OU1 is renamed on DC1 to OU1-renamed + - We verify that after replication DC1 -> DC2, + that the OU1 has the correct DN (OU1-renamed), and the description + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0)) + new_dn.add_base(self.ou1_dn.parent()) + self.ldb_dc1.rename(ou_dn, new_dn) + ldb_res = self.ldb_dc1.search(base=new_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + + ou_moved_orig = ldb_res[0] + ou_moved_dn = ldb_res[0]["dn"] + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False) + self.assertTrue("description" in ou_cur) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject10(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC1 -> DC2, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + + def test_ReplicateMoveObject11(self): + """Verifies how a moved container is replicated between two DCs. + This test should verify that: + - the OU1 is replicated properly + - the OU1 is modified on DC2 + - the OU1 is deleted on DC1 + - We verify that after replication DC2 -> DC1, + that the OU1 is deleted, and the description has gone away + + """ + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, + scope=SCOPE_BASE) + self.assertEquals(len(ldb_res), 1) + ou_orig = ldb_res[0] + ou_dn = ldb_res[0]["dn"] + + # check user info on DC1 + print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0])) + self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # trigger replication from DC1 to DC2 + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should still be valid user + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False) + + # Modify description on DC2. This triggers a replication, but + # not of 'name' and so a bug in Samba regarding the DN. + msg = ldb.Message() + msg.dn = ou_dn + msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + + # delete OU on DC1 + self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + # trigger replication from DC1 to DC2, for cleanup + self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + + # check user info on DC2 - should be deleted OU + ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True) + self.assertFalse("description" in ou_cur) + + class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): @@ -282,14 +881,28 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): attrs=["*", "parentGUID"]) self.assertEquals(len(res), 1) user_cur = res[0] - # now check properties of the user - name_orig = obj_orig["cn"][0] - name_cur = user_cur["cn"][0] + cn_orig = obj_orig["cn"][0] + cn_cur = user_cur["cn"][0] + name_orig = obj_orig["name"][0] + name_cur = user_cur["name"][0] dn_orig = obj_orig["dn"] dn_cur = user_cur["dn"] - self.assertFalse("isDeleted" in user_cur) - self.assertEquals(name_cur, name_orig) - self.assertEquals(dn_cur, dn_orig) + # now check properties of the user + if is_deleted: + self.assertTrue("isDeleted" in user_cur) + self.assertEquals(cn_cur.split('\n')[0], cn_orig) + self.assertEquals(name_cur.split('\n')[0], name_orig) + self.assertEquals(dn_cur.get_rdn_value().split('\n')[0], + dn_orig.get_rdn_value()) + self.assertEqual(name_cur, cn_cur) + else: + self.assertFalse("isDeleted" in user_cur) + self.assertEquals(cn_cur, cn_orig) + self.assertEquals(name_cur, name_orig) + self.assertEquals(dn_cur, dn_orig) + self.assertEqual(name_cur, cn_cur) + self.assertEqual(name_cur, user_cur.dn.get_rdn_value()) + return user_cur @@ -401,10 +1014,17 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): # check user info on DC2 - should be valid user user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + # Rename on DC1 new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username) new_dn.add_base(self.ou1_dn) self.ldb_dc1.rename(user_moved_dn, new_dn) + # Modify description on DC2 + msg = ldb.Message() + msg.dn = user_moved_dn + msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description") + self.ldb_dc2.modify(msg) + ldb_res = self.ldb_dc1.search(base=self.ou1_dn, scope=SCOPE_SUBTREE, expression="(samAccountName=%s)" % username) @@ -417,12 +1037,22 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase): self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) # check user info on DC2 - should be valid user user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False) + self.assertTrue("description" in user_cur) # delete user on DC1 self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0])) + # trigger replication from DC2 to DC1 + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True) + # check user info on DC1 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) + # trigger replication from DC1 to DC2, for cleanup self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True) + # check user info on DC2 - should be deleted user + user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True) + self.assertFalse("description" in user_cur) def test_ReplicateMoveInTree3(self): |