summaryrefslogtreecommitdiff
path: root/source4
diff options
context:
space:
mode:
authorAaron Haslett <aaronhaslett@catalyst.net.nz>2019-02-19 14:33:33 +1300
committerAndrew Bartlett <abartlet@samba.org>2019-04-11 04:17:11 +0000
commit5d8895f347ca0005240ec166fec4eb875f9cd356 (patch)
treef81ecc51d83dbb28a97fc792dbdf0a7f70fe0a06 /source4
parent350fc49e94525232c3556a3ace108f6e1447a490 (diff)
downloadsamba-5d8895f347ca0005240ec166fec4eb875f9cd356.tar.gz
repl: test for schema object and LA repl across chunks
During replication, transmission of objects and linked attributes are split into chunks. These two tests check behavioural consistency across chunks for regular schema objects and linked attributes. Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Diffstat (limited to 'source4')
-rwxr-xr-xsource4/selftest/tests.py6
-rw-r--r--source4/setup/schema_samba4.ldif1
-rw-r--r--source4/torture/drs/python/drs_base.py20
-rw-r--r--source4/torture/drs/python/getnc_schema.py308
4 files changed, 327 insertions, 8 deletions
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index d5620d5947c..6fc5471924b 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -1016,6 +1016,12 @@ planoldpythontestsuite("%s:local" % env, "samba_tool_drs",
name="samba4.drs.samba_tool_drs.python(%s)" % env,
environ={'DC1': '$DC_SERVER', 'DC2': '$SERVER'},
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+planoldpythontestsuite(env, "getnc_schema",
+ extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+ name="samba4.drs.getnc_schema.python(%s)" % env,
+ environ={'DC1': "$DC_SERVER", 'DC2': '$SERVER',
+ "PLEASE_BREAK_MY_WINDOWS": "1"},
+ extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
for env in ['vampire_dc', 'promoted_dc']:
planoldpythontestsuite("%s:local" % env, "samba_tool_drs",
diff --git a/source4/setup/schema_samba4.ldif b/source4/setup/schema_samba4.ldif
index a8467584365..a31b67750d4 100644
--- a/source4/setup/schema_samba4.ldif
+++ b/source4/setup/schema_samba4.ldif
@@ -31,6 +31,7 @@
## 1.3.6.1.4.1.7165.4.6.2.5.x - repl_schema.py
## 1.3.6.1.4.1.7165.4.6.2.6.x - ldap_schema.py
## 1.3.6.1.4.1.7165.4.6.2.7.x - dsdb_schema_info.py
+## 1.3.6.1.4.1.7165.4.6.2.8.x - getnc_schema.py
## 1.3.6.1.4.1.7165.4.255.x - mapped OIDs due to conflicts between AD and standards-track
#
diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index 5eaad9edc1c..0a48d63676c 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -56,11 +56,11 @@ class DrsBaseTestCase(SambaToolCmdTest):
creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
# connect to DCs
- url_dc = samba.tests.env_get_var_value("DC1")
- (self.ldb_dc1, self.info_dc1) = samba.tests.connect_samdb_ex(url_dc,
+ self.url_dc1 = samba.tests.env_get_var_value("DC1")
+ (self.ldb_dc1, self.info_dc1) = samba.tests.connect_samdb_ex(self.url_dc1,
ldap_only=True)
- url_dc = samba.tests.env_get_var_value("DC2")
- (self.ldb_dc2, self.info_dc2) = samba.tests.connect_samdb_ex(url_dc,
+ self.url_dc2 = samba.tests.env_get_var_value("DC2")
+ (self.ldb_dc2, self.info_dc2) = samba.tests.connect_samdb_ex(self.url_dc2,
ldap_only=True)
self.test_ldb_dc = self.ldb_dc1
@@ -114,11 +114,15 @@ class DrsBaseTestCase(SambaToolCmdTest):
def _samba_tool_cmd_list(self, drs_command):
# make command line credentials string
- ccache_name = self.get_creds_ccache_name()
+ # If test runs on windows then it can provide its own auth string
+ if hasattr(self, 'cmdline_auth'):
+ cmdline_auth = self.cmdline_auth
+ else:
+ ccache_name = self.get_creds_ccache_name()
- # Tunnel the command line credentials down to the
- # subcommand to avoid a new kinit
- cmdline_auth = "--krb5-ccache=%s" % ccache_name
+ # Tunnel the command line credentials down to the
+ # subcommand to avoid a new kinit
+ cmdline_auth = "--krb5-ccache=%s" % ccache_name
# bin/samba-tool drs <drs_command> <cmdline_auth>
return ["drs", drs_command, cmdline_auth]
diff --git a/source4/torture/drs/python/getnc_schema.py b/source4/torture/drs/python/getnc_schema.py
new file mode 100644
index 00000000000..5b67b29d981
--- /dev/null
+++ b/source4/torture/drs/python/getnc_schema.py
@@ -0,0 +1,308 @@
+import drs_base
+import ldb
+import time
+from samba.dcerpc import misc
+from samba.drs_utils import drs_Replicate, drsException
+import samba
+import random
+import time
+import os
+
+break_me = os.getenv("PLEASE_BREAK_MY_WINDOWS") == "1"
+assert break_me, ("This test breaks Windows active directory after "
+ "a few runs. Set PLEASE_BREAK_MY_WINDOWS=1 to run.")
+
+# This test runs against Windows. To run, set up two Windows AD DCs, join one
+# to the other, and make sure the passwords are the same. SMB_CONF_PATH must
+# also be set to any smb.conf file. Set DC1 to the PDC's hostname, and DC2 to
+# the join'd DC's hostname. Example:
+# PLEASE_BREAK_MY_WINDOWS=1
+# DC1=pdc DC2=joindc
+# SMB_CONF_PATH=st/ad_dc/etc/smb.conf
+# PYTHONPATH=$PYTHONPATH:./source4/torture/drs/python
+# python3 ./source4/scripting/bin/subunitrun getnc_schema
+# -UAdministrator%Password
+
+class SchemaReplicationTests(drs_base.DrsBaseTestCase):
+
+ def setUp(self):
+ super(SchemaReplicationTests, self).setUp()
+ self.creds = self.get_credentials()
+ self.cmdline_auth = "-U{}%{}".format(self.creds.get_username(),
+ self.creds.get_password())
+
+ self.from_ldb = self.ldb_dc1
+ self.dest_ldb = self.ldb_dc2
+ self._disable_inbound_repl(self.url_dc1)
+ self._disable_all_repl(self.url_dc1)
+ self.free_offset = 0
+
+ def tearDown(self):
+ self._enable_inbound_repl(self.url_dc1)
+ self._enable_all_repl(self.url_dc1)
+
+ def do_repl(self, partition_dn):
+ self._enable_inbound_repl(self.url_dc1)
+ self._enable_all_repl(self.url_dc1)
+
+ samba_tool_cmd = ["drs", "replicate", self.url_dc2, self.url_dc1]
+ samba_tool_cmd += [partition_dn]
+ username = self.creds.get_username()
+ password = self.creds.get_password()
+ samba_tool_cmd += ["-U{0}%{1}".format(username, password)]
+
+ (result, out, err) = self.runsubcmd(*samba_tool_cmd)
+
+ try:
+ self.assertCmdSuccess(result, out, err)
+ except AssertionError:
+ print("Failed repl, retrying in 10s")
+ time.sleep(10)
+ (result, out, err) = self.runsubcmd(*samba_tool_cmd)
+
+ self._disable_inbound_repl(self.url_dc1)
+ self._disable_all_repl(self.url_dc1)
+
+ self.assertCmdSuccess(result, out, err)
+
+ # Get a unique prefix for some search expression like "([att]=[pref]{i}*)"
+ def get_unique(self, expr_templ):
+ found = True
+ while found:
+ i = random.randint(0, 65535)
+ res = self.from_ldb.search(base=self.schema_dn,
+ scope=ldb.SCOPE_SUBTREE,
+ expression=expr_templ.format(i=i))
+ found = len(res) > 0
+
+ return str(i)
+
+ def unique_gov_id_prefix(self):
+ prefix = "1.3.6.1.4.1.7165.4.6.2.8."
+ return prefix + self.get_unique("(governsId=" + prefix + "{i}.*)")
+
+ def unique_cn_prefix(self, prefix="testobj"):
+ return prefix + self.get_unique("(cn=" + prefix + "{i}x*)") + "x"
+
+ # Make test schema classes linked to each other in a line, then modify
+ # them in reverse order so when we repl, a link crosses the chunk
+ # boundary. Chunk size is 133 by default so we do 150.
+ def test_poss_superiors_across_chunk(self):
+ num_schema_objects_to_add = 150
+ class_name = self.unique_cn_prefix()
+
+ ldif_template = """
+dn: CN={class_name}{i},{schema_dn}
+objectClass: top
+objectClass: classSchema
+adminDescription: {class_name}{i}
+adminDisplayName: {class_name}{i}
+cn: {class_name}{i}
+governsId: {gov_id}.{i}
+instanceType: 4
+objectClassCategory: 1
+systemFlags: 16
+systemOnly: FALSE
+"""
+
+ ldif_kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn}
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(i=0, gov_id=gov_id, **ldif_kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ ldif_template += "systemPossSuperiors: {possSup}\n"
+
+ ids = list(range(num_schema_objects_to_add))
+ got_no_such_attrib = False
+ for i in ids[1:]:
+ last_class_name = class_name + str(i-1)
+ ldif = ldif_template.format(i=i, gov_id=gov_id,
+ possSup=last_class_name,
+ **ldif_kwargs)
+
+ try:
+ self.from_ldb.add_ldif(ldif)
+ if got_no_such_attrib:
+ self.from_ldb.set_schema_update_now()
+ except ldb.LdbError as e:
+ if e.args[0] != ldb.ERR_NO_SUCH_ATTRIBUTE:
+ self.fail(e)
+ if got_no_such_attrib:
+ self.fail(("got NO_SUCH_ATTRIB even after "
+ "setting schemaUpdateNow", str(e)))
+ print("got NO_SUCH_ATTRIB, trying schemaUpdateNow")
+ got_no_such_attrib = True
+ self.from_ldb.set_schema_update_now()
+ self.from_ldb.add_ldif(ldif)
+ self.from_ldb.set_schema_update_now()
+
+ ldif_template = """
+dn: CN={class_name}{i},{schema_dn}
+changetype: modify
+replace: adminDescription
+adminDescription: new_description
+"""
+
+ for i in reversed(ids):
+ ldif = ldif_template.format(i=i, **ldif_kwargs)
+ self.from_ldb.modify_ldif(ldif)
+
+ self.do_repl(self.schema_dn)
+
+ dn_templ = "CN={class_name}{i},{schema_dn}"
+ for i in ids:
+ dn = dn_templ.format(i=i, **ldif_kwargs)
+ res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+
+ # Test for method of adding linked attributes in schema partition
+ # required by other tests.
+ def test_create_linked_attribute_in_schema(self):
+ # Make an object outside of the schema partition that we can link to
+ user_name = self.unique_cn_prefix("user")
+ user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)
+
+ ldif_template = """
+dn: {user_dn}
+objectClass: person
+objectClass: user"""
+ ldif = ldif_template.format(user_dn=user_dn)
+ self.from_ldb.add_ldif(ldif)
+
+ # Make test class name unique so test can run multiple times
+ class_name = self.unique_cn_prefix("class")
+
+ kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn,
+ 'user_dn': user_dn}
+
+ # Add an auxiliary schemaClass (cat 3) class and give it managedBy
+ # so we can create schema objects with linked attributes.
+ ldif_template = """
+dn: CN={class_name},{schema_dn}
+objectClass: classSchema
+governsId: {gov_id}.0
+instanceType: 4
+systemFlags: 16
+systemOnly: FALSE
+objectClassCategory: 3
+mayContain: managedBy
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Now make an instance that points back to the user with managedBy,
+ # thus creating an object in the schema with a linked attribute
+ ldif_template = """
+dn: CN=link{class_name},{schema_dn}
+objectClass: classSchema
+objectClass: {class_name}
+instanceType: 4
+governsId: {gov_id}.0
+systemFlags: 16
+managedBy: {user_dn}
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Check link exists on test schema object
+ dn_templ = "CN=link{class_name},{schema_dn}"
+ dn = dn_templ.format(**kwargs)
+ res = self.from_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ self.assertIsNotNone(res[0].get("managedBy"))
+ self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)
+
+ # Check backlink on user object
+ res = self.from_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ managed_objs = res[0].get("managedObjects")
+ self.assertEqual(len(managed_objs), 1)
+ managed_objs = [str(o) for o in managed_objs]
+ self.assertEqual(managed_objs, [dn_templ.format(**kwargs)])
+
+ def test_schema_linked_attributes(self):
+ num_test_objects = 9
+
+ # Make an object outside of the schema partition that we can link to
+ user_name = self.unique_cn_prefix("user")
+ user_dn = "CN={},CN=Users,{}".format(user_name, self.domain_dn)
+
+ ldif_template = """
+dn: {user_dn}
+objectClass: person
+objectClass: user"""
+ ldif = ldif_template.format(user_dn=user_dn)
+ self.from_ldb.add_ldif(ldif)
+
+ self.do_repl(self.domain_dn)
+
+ # Make test object name prefixes unique so test can run multiple times
+ # in a single testenv (can't delete schema objects)
+ class_name = self.unique_cn_prefix("class")
+ link_class_name = self.unique_cn_prefix("linkClass")
+
+ kwargs = {'class_name': class_name,
+ 'schema_dn': self.schema_dn,
+ 'link_class_name': link_class_name,
+ 'user_dn': user_dn}
+
+ # Add an auxiliary schemaClass (cat 3) class and give it managedBy
+ # so we can create schema objects with linked attributes.
+ ldif_template = """
+dn: CN={class_name},{schema_dn}
+objectClass: classSchema
+governsId: {gov_id}.0
+instanceType: 4
+systemFlags: 16
+systemOnly: FALSE
+objectClassCategory: 3
+mayContain: managedBy
+"""
+
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ # Now make instances that point back to the user with managedBy,
+ # thus creating objects in the schema with linked attributes
+ ldif_template = """
+dn: CN={link_class_name}{i},{schema_dn}
+objectClass: classSchema
+objectClass: {class_name}
+instanceType: 4
+governsId: {gov_id}.0
+systemFlags: 16
+managedBy: {user_dn}
+"""
+
+ id_range = list(range(num_test_objects))
+ for i in id_range:
+ gov_id = self.unique_gov_id_prefix()
+ ldif = ldif_template.format(i=i, gov_id=gov_id, **kwargs)
+ self.from_ldb.add_ldif(ldif)
+
+ self.do_repl(self.schema_dn)
+
+ # Check link exists in each test schema objects at destination DC
+ dn_templ = "CN={link_class_name}{i},{schema_dn}"
+ for i in id_range:
+ dn = dn_templ.format(i=i, **kwargs)
+ res = self.dest_ldb.search(base=dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ self.assertIsNotNone(res[0].get("managedBy"))
+ self.assertEqual(str(res[0].get("managedBy")[0]), user_dn)
+
+ # Check backlinks list on user object contains DNs of test objects.
+ res = self.dest_ldb.search(base=user_dn, scope=ldb.SCOPE_BASE)
+ self.assertEqual(len(res), 1)
+ managed_objs = res[0].get("managedObjects")
+ self.assertIsNotNone(managed_objs)
+ managed_objs_set = {str(el) for el in managed_objs}
+ expected = {dn_templ.format(i=i, **kwargs) for i in id_range}
+ self.assertEqual(managed_objs_set, expected)