diff options
Diffstat (limited to 'source4/torture/drs/python/getnc_unpriv.py')
-rw-r--r-- | source4/torture/drs/python/getnc_unpriv.py | 212 |
1 files changed, 181 insertions, 31 deletions
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py index dd6142aa816..41d96110492 100644 --- a/source4/torture/drs/python/getnc_unpriv.py +++ b/source4/torture/drs/python/getnc_unpriv.py @@ -1,7 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- # -# Tests replication scenarios with different user privileges +# Tests replication scenarios with different user privileges. +# We want to test every replication scenario we can think of against: +# - users with only GET_CHANGES privileges +# - users with only GET_ALL_CHANGES privileges +# - users with both GET_CHANGES and GET_ALL_CHANGES privileges +# - users with no privileges # # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017 @@ -30,12 +35,13 @@ import drs_base import samba.tests +from samba import werror, WERRORError from samba import sd_utils import ldb from ldb import SCOPE_BASE -from samba.dcerpc import drsuapi +from samba.dcerpc import drsuapi, security from samba.credentials import DONT_USE_KERBEROS class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase): @@ -55,10 +61,13 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase): (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1) self.sd_utils = sd_utils.SDUtils(self.ldb_dc1) - user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou) - user_sid = self.sd_utils.get_object_sid(user_dn) - mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid) - self.sd_utils.dacl_add_ace(self.base_dn, mod) + self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou) + user_sid = self.sd_utils.get_object_sid(self.user_dn) + self.acl_mod_get_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES, + str(user_sid)) + self.acl_mod_get_all_changes = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES, + str(user_sid)) + self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn) # We set DONT_USE_KERBEROS to avoid a race with getting the # user replicated to our selected KDC @@ -70,6 +79,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase): self.user_creds) def tearDown(self): + self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl) try: self.ldb_dc1.delete(self.ou, ["tree_delete:1"]) except ldb.LdbError as (enum, string): @@ -77,35 +87,175 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase): pass super(DrsReplicaSyncUnprivTestCase, self).tearDown() - def test_do_single_repl(self): + def _test_repl_exop(self, exop, repl_obj, expected_error, dest_dsa=None, + partial_attribute_set=None): """ - Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged - user with the correct GET_CHANGES rights + Common function to send a replication request and check the result + matches what's expected. """ - - ou1 = "OU=single_obj,%s" % self.ou - self.ldb_dc1.add({ - "dn": ou1, - "objectclass": "organizationalUnit" - }) - req8 = self._exop_req8(dest_dsa=None, + req8 = self._exop_req8(dest_dsa=dest_dsa, invocation_id=self.ldb_dc1.get_invocation_id(), - nc_dn_str=ou1, - exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ, - replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP) - (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8) - self._check_ctr6(ctr, [ou1]) + nc_dn_str=repl_obj, + exop=exop, + replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP, + partial_attribute_set=partial_attribute_set) + + if expected_error is None: + # user is OK, request should be accepted without throwing an error + (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, + 8, req8) + else: + # check the request is rejected (with the error we're expecting) + try: + (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, + 8, req8) + self.fail("Should have failed with user denied access") + except WERRORError as (enum, estr): + self.assertEquals(enum, expected_error, + "Got unexpected error: %s" % estr) - def test_do_full_repl(self): + def _test_repl_single_obj(self, repl_obj, expected_error, + partial_attribute_set=None): """ - Make sure that full replication works as a less-privileged - user with the correct GET_CHANGES rights + Checks that replication on a single object either succeeds or fails as + expected (based on the user's access rights) """ + self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ, + repl_obj=repl_obj, + expected_error=expected_error, + partial_attribute_set=partial_attribute_set) + + def _test_repl_secret(self, repl_obj, expected_error, dest_dsa=None): + """ + Checks that REPL_SECRET on an object either succeeds or fails as + expected (based on the user's access rights) + """ + self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET, + repl_obj=repl_obj, + expected_error=expected_error, + dest_dsa=dest_dsa) + + def _test_repl_full(self, expected_error, partial_attribute_set=None): + """ + Checks that a full replication either succeeds or fails as expected + (based on the user's access rights) + """ + self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE, + repl_obj=self.ldb_dc1.get_default_basedn(), + expected_error=expected_error, + partial_attribute_set=partial_attribute_set) + + def _test_repl_full_on_ou(self, expected_error): + """ + Full replication on a specific OU should always fail (it should be done + against a base NC). The error may vary based on the user's access rights + """ + # Just try against the OU created in the test setup + self._test_repl_exop(exop=drsuapi.DRSUAPI_EXOP_NONE, + repl_obj=self.ou, + expected_error=expected_error) + + def test_repl_getchanges_userpriv(self): + """ + Tests various replication requests made by a user with only GET_CHANGES + rights. Some requests will be accepted, but most will be rejected. + """ + + # Assign the user GET_CHANGES rights + self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_changes) + + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + + self._test_repl_secret(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + self._test_repl_secret(repl_obj=self.user_dn, + expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + self._test_repl_secret(repl_obj=self.user_dn, + dest_dsa=self.ldb_dc1.get_ntds_GUID(), + expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + + self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC) + + # Partial Attribute Sets don't require GET_ALL_CHANGES rights, so we + # expect the following to succeed + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=None, + partial_attribute_set=self.get_partial_attribute_set()) + self._test_repl_full(expected_error=None, + partial_attribute_set=self.get_partial_attribute_set()) + + def test_repl_getallchanges_userpriv(self): + """ + Tests various replication requests made by a user with only + GET_ALL_CHANGES rights. Note that assigning these rights is possible, + but doesn't make a lot of sense. We test it anyway for consistency. + """ + + # Assign the user GET_ALL_CHANGES rights + self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_get_all_changes) + + # We can expect to get the same responses as an unprivileged user, + # i.e. we have permission to see the results, but don't have permission + # to ask + self.test_repl_no_userpriv() + + def test_repl_both_userpriv(self): + """ + Tests various replication requests made by a privileged user (i.e. has + both GET_CHANGES and GET_ALL_CHANGES). We expect any valid requests + to be accepted. + """ + + # Assign the user both GET_CHANGES and GET_ALL_CHANGES rights + both_rights = self.acl_mod_get_changes + self.acl_mod_get_all_changes + self.sd_utils.dacl_add_ace(self.base_dn, both_rights) + + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=None) + + self._test_repl_secret(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_DB_ERROR) + self._test_repl_secret(repl_obj=self.user_dn, + expected_error=werror.WERR_DS_DRA_DB_ERROR) + self._test_repl_secret(repl_obj=self.user_dn, + dest_dsa=self.ldb_dc1.get_ntds_GUID(), + expected_error=None) + + self._test_repl_full(expected_error=None) + self._test_repl_full_on_ou(expected_error=werror.WERR_DS_CANT_FIND_EXPECTED_NC) + + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=None, + partial_attribute_set=self.get_partial_attribute_set()) + self._test_repl_full(expected_error=None, + partial_attribute_set=self.get_partial_attribute_set()) + + def test_repl_no_userpriv(self): + """ + Tests various replication requests made by a unprivileged user. + We expect all these requests to be rejected. + """ + + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_BAD_DN) + + self._test_repl_secret(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_BAD_DN) + self._test_repl_secret(repl_obj=self.user_dn, + expected_error=werror.WERR_DS_DRA_BAD_DN) + self._test_repl_secret(repl_obj=self.user_dn, + dest_dsa=self.ldb_dc1.get_ntds_GUID(), + expected_error=werror.WERR_DS_DRA_BAD_DN) + + self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED) + self._test_repl_full_on_ou(expected_error=werror.WERR_DS_DRA_BAD_DN) + + self._test_repl_single_obj(repl_obj=self.ou, + expected_error=werror.WERR_DS_DRA_BAD_DN, + partial_attribute_set=self.get_partial_attribute_set()) + self._test_repl_full(expected_error=werror.WERR_DS_DRA_ACCESS_DENIED, + partial_attribute_set=self.get_partial_attribute_set()) + - req8 = self._exop_req8(dest_dsa=None, - invocation_id=self.ldb_dc1.get_invocation_id(), - nc_dn_str=self.ldb_dc1.get_default_basedn(), - exop=drsuapi.DRSUAPI_EXOP_NONE, - replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP) - (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8) - self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE) |