summaryrefslogtreecommitdiff
path: root/source4/torture/drs/python/getnc_exop.py
diff options
context:
space:
mode:
Diffstat (limited to 'source4/torture/drs/python/getnc_exop.py')
-rw-r--r--source4/torture/drs/python/getnc_exop.py80
1 files changed, 74 insertions, 6 deletions
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index 885156cc6e2..9c16ecca323 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -241,8 +241,8 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
- def test_InvalidNC_DummyDN_InvalidGUID(self):
- """Test full replication on a totally invalid GUID fails with the right error code"""
+ def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
+ """Test single object replication on a totally invalid GUID fails with the right error code"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
@@ -250,16 +250,16 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
invocation_id=fsmo_owner["invocation_id"],
nc_dn_str="DummyDN",
nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
- exop=drsuapi.DRSUAPI_EXOP_NONE)
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
(drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
try:
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
- self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+ self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN)
- def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
+ def test_InvalidNC_DummyDN_InvalidGUID_REPL_SECRET(self):
"""Test single object replication on a totally invalid GUID fails with the right error code"""
fsmo_dn = self.ldb_dc1.get_schema_basedn()
(fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
@@ -295,6 +295,52 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(enum, estr) = e1.args
self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+ def test_DummyDN_valid_GUID_REPL_OBJ(self):
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+ res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE,
+ attrs=["objectGUID"])
+
+ guid = misc.GUID(res[0]["objectGUID"][0])
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str="DummyDN",
+ nc_guid=guid,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
+
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.fail(f"Failed to call GetNCChanges with EXOP_REPL_OBJ and a GUID: {estr}")
+
+ self.assertEqual(ctr.first_object.object.identifier.guid, guid)
+
+ def test_DummyDN_valid_GUID_REPL_SECRET(self):
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+ res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE,
+ attrs=["objectGUID"])
+
+ guid = misc.GUID(res[0]["objectGUID"][0])
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str="DummyDN",
+ nc_guid=guid,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
+
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+
+ # We expect to get as far as failing on the missing dest_dsa
+ self.assertEqual(enum, werror.WERR_DS_DRA_DB_ERROR)
+
def test_link_utdv_hwm(self):
"""Test verify the DRS_GET_ANC behavior."""
@@ -668,7 +714,29 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
except WERRORError as e1:
(enum, estr) = e1.args
- self.fail("DsGetNCChanges failed with {estr}")
+ self.fail(f"DsGetNCChanges failed with {estr}")
+ self.assertEqual(level, 6, "Expected level 6 response!")
+ self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+ self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+ self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
+ def test_InvalidDestDSA_and_GUID_RID_ALLOC(self):
+ """Test role transfer with invalid destination DSA guid"""
+ fsmo_dn = self.ldb_dc1.get_schema_basedn()
+ (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=fsmo_owner["invocation_id"],
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+ (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.fail(f"DsGetNCChanges failed with {estr}")
self.assertEqual(level, 6, "Expected level 6 response!")
self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))