summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Bartlett <abartlet@samba.org>2016-05-10 16:40:51 +1200
committerAndrew Bartlett <abartlet@samba.org>2016-06-07 10:28:10 +0200
commit9dcc62eb781fdc05070bc12c43081e086963e4ad (patch)
tree440adf9de393be509abf9813f060ea10514c246f
parent5fee4aa90787237d7a7af11febdc45277a896976 (diff)
downloadsamba-9dcc62eb781fdc05070bc12c43081e086963e4ad.tar.gz
selftest: Add more tests to cover attribute changes vs DN renames
This covers a bug where unrelated attribute changes would reverse a rename Signed-off-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz
-rw-r--r--source4/torture/drs/python/repl_move.py660
1 files changed, 645 insertions, 15 deletions
diff --git a/source4/torture/drs/python/repl_move.py b/source4/torture/drs/python/repl_move.py
index 06cfc02278f..5ccf108195e 100644
--- a/source4/torture/drs/python/repl_move.py
+++ b/source4/torture/drs/python/repl_move.py
@@ -31,6 +31,7 @@ import time
from ldb import (
+ SCOPE_BASE,
SCOPE_SUBTREE,
)
@@ -68,7 +69,12 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
def tearDown(self):
- self.ldb_dc1.delete(self.ou1_dn, ["tree_delete:1"])
+ try:
+ self.ldb_dc1.delete(self.ou1_dn, ["tree_delete:1"])
+ except ldb.LdbError as (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ pass
+
self.ldb_dc1.delete(self.ou2_dn, ["tree_delete:1"])
self._enable_inbound_repl(self.dnsname_dc1)
self._enable_inbound_repl(self.dnsname_dc2)
@@ -86,18 +92,26 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
attrs=["*", "parentGUID"])
self.assertEquals(len(res), 1)
user_cur = res[0]
- # now check properties of the user
- cn_orig = obj_orig["cn"][0]
- cn_cur = user_cur["cn"][0]
+ rdn_orig = obj_orig[user_cur.dn.get_rdn_name()][0]
+ rdn_cur = user_cur[user_cur.dn.get_rdn_name()][0]
name_orig = obj_orig["name"][0]
name_cur = user_cur["name"][0]
dn_orig = obj_orig["dn"]
dn_cur = user_cur["dn"]
- self.assertFalse("isDeleted" in user_cur)
- self.assertEquals(cn_cur, cn_orig)
- self.assertEquals(name_cur, name_orig)
- self.assertEquals(dn_cur, dn_orig)
- self.assertEqual(name_cur, cn_cur)
+ # now check properties of the user
+ if is_deleted:
+ self.assertTrue("isDeleted" in user_cur)
+ self.assertEquals(rdn_cur.split('\n')[0], rdn_orig)
+ self.assertEquals(name_cur.split('\n')[0], name_orig)
+ self.assertEquals(dn_cur.get_rdn_value().split('\n')[0],
+ dn_orig.get_rdn_value())
+ self.assertEqual(name_cur, rdn_cur)
+ else:
+ self.assertFalse("isDeleted" in user_cur)
+ self.assertEquals(rdn_cur, rdn_orig)
+ self.assertEquals(name_cur, name_orig)
+ self.assertEquals(dn_cur, dn_orig)
+ self.assertEqual(name_cur, rdn_cur)
self.assertEqual(name_cur, user_cur.dn.get_rdn_value())
return user_cur
@@ -110,6 +124,10 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
- the OU is renamed
- We verify that after replication,
that the user has the correct DN (under OU2)
+ - the OU is deleted
+ - the OU is modified on DC2
+ - We verify that after replication,
+ that the user has the correct DN (deleted) and has not description
"""
# work-out unique username to test with
@@ -148,10 +166,27 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = new_dn
+ msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
+ # trigger replication from DC2 to DC1, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC1 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
def test_ReplicateMoveObject2(self):
"""Verifies how a moved container with a user inside is not
@@ -199,6 +234,570 @@ class DrsMoveObjectTestCase(drs_base.DrsBaseTestCase):
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_orig, is_deleted=True)
+
+ # trigger replication from DC2 to DC1, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC1 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=True)
+
+
+ def test_ReplicateMoveObject3(self):
+ """Verifies how a moved container with a user inside is replicated between two DCs.
+ This test should verify that:
+ - the OU is created on DC1
+ - the OU is renamed on DC1
+ - We verify that after replication,
+ that the user has the correct DN (under OU2).
+
+ """
+ # work-out unique username to test with
+ username = self._make_username()
+
+ # create user on DC1
+ self.ldb_dc1.newuser(username=username,
+ userou="ou=%s" % self.ou1_dn.get_component_value(0),
+ password=None, setpassword=False)
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+ user_orig = ldb_res[0]
+ user_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(user_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+
+ user_moved_orig = ldb_res[0]
+ user_moved_dn = ldb_res[0]["dn"]
+
+ # trigger replication from DC2 (Which has never seen the object) to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC1 - should be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False)
+
+ # delete user on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be delted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+
+
+ def test_ReplicateMoveObject4(self):
+ """Verifies how a moved container with a user inside is replicated between two DCs.
+ This test should verify that:
+ - the OU is replicated properly
+ - the user is modified on DC2
+ - the OU is renamed on DC1
+ - We verify that after replication DC1 -> DC2,
+ that the user has the correct DN (under OU2), and the description
+
+ """
+ # work-out unique username to test with
+ username = self._make_username()
+
+ # create user on DC1
+ self.ldb_dc1.newuser(username=username,
+ userou="ou=%s" % self.ou1_dn.get_component_value(0),
+ password=None, setpassword=False)
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+ user_orig = ldb_res[0]
+ user_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(user_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+
+ user_moved_orig = ldb_res[0]
+ user_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = user_dn
+ msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ self.assertTrue("description" in user_cur)
+
+ # delete user on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
+ def test_ReplicateMoveObject5(self):
+ """Verifies how a moved container with a user inside is replicated between two DCs.
+ This test should verify that:
+ - the OU is replicated properly
+ - the user is modified on DC2
+ - the OU is renamed on DC1
+ - We verify that after replication DC2 -> DC1,
+ that the user has the correct DN (under OU2), and the description
+
+ """
+ # work-out unique username to test with
+ username = self._make_username()
+
+ # create user on DC1
+ self.ldb_dc1.newuser(username=username,
+ userou="ou=%s" % self.ou1_dn.get_component_value(0),
+ password=None, setpassword=False)
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+ user_orig = ldb_res[0]
+ user_dn = ldb_res[0]["dn"]
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (username, self._GUID_string(user_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(user_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=self.ou2_dn,
+ scope=SCOPE_SUBTREE,
+ expression="(samAccountName=%s)" % username)
+ self.assertEquals(len(ldb_res), 1)
+
+ user_moved_orig = ldb_res[0]
+ user_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = user_dn
+ msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC1 - should still be valid user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=False)
+ self.assertTrue("description" in user_cur)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ self.assertTrue("description" in user_cur)
+
+ # delete user on DC2
+ self.ldb_dc2.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1 for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
+
+ def test_ReplicateMoveObject6(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is renamed on DC1
+ - We verify that after replication DC1 -> DC2,
+ that the OU1 has the correct DN (under OU2), and the description
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0))
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(ou_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=new_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+
+ ou_moved_orig = ldb_res[0]
+ ou_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False)
+ self.assertTrue("description" in ou_cur)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
+ def test_ReplicateMoveObject7(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is renamed on DC1 to be under OU2
+ - We verify that after replication DC2 -> DC1,
+ that the OU1 has the correct DN (under OU2), and the description
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "OU=%s" % self.ou1_dn.get_component_value(0))
+ new_dn.add_base(self.ou2_dn)
+ self.ldb_dc1.rename(ou_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=new_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+
+ ou_moved_orig = ldb_res[0]
+ ou_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC1 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False)
+ self.assertTrue("description" in ou_cur)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
+ def test_ReplicateMoveObject8(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is renamed on DC1 to OU1-renamed
+ - We verify that after replication DC1 -> DC2,
+ that the OU1 has the correct DN (OU1-renamed), and the description
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0))
+ new_dn.add_base(self.ou1_dn.parent())
+ self.ldb_dc1.rename(ou_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=new_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+
+ ou_moved_orig = ldb_res[0]
+ ou_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=False)
+ self.assertTrue("description" in ou_cur)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
+ def test_ReplicateMoveObject9(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is renamed on DC1 to be under OU2
+ - the OU1 is renamed on DC1 to OU1-renamed
+ - We verify that after replication DC1 -> DC2,
+ that the OU1 has the correct DN (OU1-renamed), and the description
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ new_dn = ldb.Dn(self.ldb_dc1, "OU=%s-renamed" % self.ou1_dn.get_component_value(0))
+ new_dn.add_base(self.ou1_dn.parent())
+ self.ldb_dc1.rename(ou_dn, new_dn)
+ ldb_res = self.ldb_dc1.search(base=new_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+
+ ou_moved_orig = ldb_res[0]
+ ou_moved_dn = ldb_res[0]["dn"]
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC1 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=False)
+ self.assertTrue("description" in ou_cur)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_moved_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
+ def test_ReplicateMoveObject10(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is deleted on DC1
+ - We verify that after replication DC1 -> DC2,
+ that the OU1 is deleted, and the description has gone away
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should be deleted OU
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+
+ # check user info on DC2 - should be deleted OU
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
+ def test_ReplicateMoveObject11(self):
+ """Verifies how a moved container is replicated between two DCs.
+ This test should verify that:
+ - the OU1 is replicated properly
+ - the OU1 is modified on DC2
+ - the OU1 is deleted on DC1
+ - We verify that after replication DC2 -> DC1,
+ that the OU1 is deleted, and the description has gone away
+
+ """
+ ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
+ scope=SCOPE_BASE)
+ self.assertEquals(len(ldb_res), 1)
+ ou_orig = ldb_res[0]
+ ou_dn = ldb_res[0]["dn"]
+
+ # check user info on DC1
+ print "Testing for %s with GUID %s" % (self.ou1_dn, self._GUID_string(ou_orig["objectGUID"][0]))
+ self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # trigger replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should still be valid user
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=False)
+
+ # Modify description on DC2. This triggers a replication, but
+ # not of 'name' and so a bug in Samba regarding the DN.
+ msg = ldb.Message()
+ msg.dn = ou_dn
+ msg["description"] = ldb.MessageElement("OU Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
+ # delete OU on DC1
+ self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(ou_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC2 - should be deleted OU
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=ou_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+ # trigger replication from DC1 to DC2, for cleanup
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+
+ # check user info on DC2 - should be deleted OU
+ ou_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=ou_orig, is_deleted=True)
+ self.assertFalse("description" in ou_cur)
+
+
class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
@@ -282,14 +881,28 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
attrs=["*", "parentGUID"])
self.assertEquals(len(res), 1)
user_cur = res[0]
- # now check properties of the user
- name_orig = obj_orig["cn"][0]
- name_cur = user_cur["cn"][0]
+ cn_orig = obj_orig["cn"][0]
+ cn_cur = user_cur["cn"][0]
+ name_orig = obj_orig["name"][0]
+ name_cur = user_cur["name"][0]
dn_orig = obj_orig["dn"]
dn_cur = user_cur["dn"]
- self.assertFalse("isDeleted" in user_cur)
- self.assertEquals(name_cur, name_orig)
- self.assertEquals(dn_cur, dn_orig)
+ # now check properties of the user
+ if is_deleted:
+ self.assertTrue("isDeleted" in user_cur)
+ self.assertEquals(cn_cur.split('\n')[0], cn_orig)
+ self.assertEquals(name_cur.split('\n')[0], name_orig)
+ self.assertEquals(dn_cur.get_rdn_value().split('\n')[0],
+ dn_orig.get_rdn_value())
+ self.assertEqual(name_cur, cn_cur)
+ else:
+ self.assertFalse("isDeleted" in user_cur)
+ self.assertEquals(cn_cur, cn_orig)
+ self.assertEquals(name_cur, name_orig)
+ self.assertEquals(dn_cur, dn_orig)
+ self.assertEqual(name_cur, cn_cur)
+ self.assertEqual(name_cur, user_cur.dn.get_rdn_value())
+
return user_cur
@@ -401,10 +1014,17 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
# check user info on DC2 - should be valid user
user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ # Rename on DC1
new_dn = ldb.Dn(self.ldb_dc1, "CN=%s" % username)
new_dn.add_base(self.ou1_dn)
self.ldb_dc1.rename(user_moved_dn, new_dn)
+ # Modify description on DC2
+ msg = ldb.Message()
+ msg.dn = user_moved_dn
+ msg["description"] = ldb.MessageElement("User Description", ldb.FLAG_MOD_REPLACE, "description")
+ self.ldb_dc2.modify(msg)
+
ldb_res = self.ldb_dc1.search(base=self.ou1_dn,
scope=SCOPE_SUBTREE,
expression="(samAccountName=%s)" % username)
@@ -417,12 +1037,22 @@ class DrsMoveBetweenTreeOfObjectTestCase(drs_base.DrsBaseTestCase):
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
# check user info on DC2 - should be valid user
user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=False)
+ self.assertTrue("description" in user_cur)
# delete user on DC1
self.ldb_dc1.delete('<GUID=%s>' % self._GUID_string(user_orig["objectGUID"][0]))
+ # trigger replication from DC2 to DC1
+ self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
+ # check user info on DC1 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc1, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
+
# trigger replication from DC1 to DC2, for cleanup
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
+ # check user info on DC2 - should be deleted user
+ user_cur = self._check_obj(sam_ldb=self.ldb_dc2, obj_orig=user_moved_orig, is_deleted=True)
+ self.assertFalse("description" in user_cur)
def test_ReplicateMoveInTree3(self):