diff options
author | Gary Lockyer <gary@catalyst.net.nz> | 2018-10-12 11:21:10 +1300 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2018-11-20 22:14:17 +0100 |
commit | 7dd7800a88e6ece0606d6d94718318e2dc067d58 (patch) | |
tree | 87bd47954ed3cd0c95198b4d9db21d8b89371f4e /python | |
parent | fc1a16ff61ac57079d7e31fc2266b35b2fdeaae0 (diff) | |
download | samba-7dd7800a88e6ece0606d6d94718318e2dc067d58.tar.gz |
test samr: Extra tests for samr_EnumDomainGroups
Add extra tests to test the content returned by samr_EnumDomainGroups,
and tests for the result caching added in the following commit.
Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r-- | python/samba/tests/dcerpc/sam.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py index f709ba48950..11ed9b9287f 100644 --- a/python/samba/tests/dcerpc/sam.py +++ b/python/samba/tests/dcerpc/sam.py @@ -32,6 +32,7 @@ from samba.dsdb import ( GTYPE_SECURITY_UNIVERSAL_GROUP, GTYPE_SECURITY_GLOBAL_GROUP) from samba import generate_random_password +from samba.ndr import ndr_unpack import os @@ -40,6 +41,24 @@ def toArray(handle, array, num_entries): return [(entry.idx, entry.name) for entry in array.entries[:num_entries]] +# Extract the rid from an ldb message, assumes that the message has a +# objectSID attribute +# +def rid(msg): + sid = ndr_unpack(security.dom_sid, msg["objectSID"][0]) + (_, rid) = sid.split() + return rid + + +# Calculate the request size for EnumDomainUsers and EnumDomainGroups calls +# to hold the specified number of entries. +# We use the w2k3 element size value of 54, code under test +# rounds this up i.e. (1+(max_size/SAMR_ENUM_USERS_MULTIPLIER)) +# +def calc_max_size(num_entries): + return (num_entries - 1) * 54 + + class SamrTests(RpcInterfaceTestCase): def setUp(self): @@ -72,6 +91,18 @@ class SamrTests(RpcInterfaceTestCase): self.domain_handle = self.conn.OpenDomain( self.handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid) + # Filter a list of records, removing those that are not part of the + # current domain. + # + def filter_domain(self, unfiltered): + def sid(msg): + sid = ndr_unpack(security.dom_sid, msg["objectSID"][0]) + (x, _) = sid.split() + return x + + dom_sid = security.dom_sid(self.samdb.get_domain_sid()) + return [x for x in unfiltered if sid(x) == dom_sid] + def test_connect5(self): (level, info, handle) =\ self.conn.Connect5(None, 0, 1, samr.ConnectInfo1()) @@ -435,3 +466,141 @@ class SamrTests(RpcInterfaceTestCase): 5, check_results, select, attributes, self.create_groups) self.delete_dns(dns) + + def test_EnumDomainGroups(self): + def check_results(expected, actual): + for (e, a) in zip(expected, actual): + self.assertTrue(isinstance(a, samr.SamEntry)) + self.assertEquals( + str(e["sAMAccountName"]), str(a.name.string)) + + # Create four groups + # to ensure that we have the minimum needed for the tests. + dns = self.create_groups([1, 2, 3, 4]) + + # + # Get the expected results by querying the samdb database directly. + # We do this rather than use a list of expected results as this runs + # with other tests so we do not have a known fixed list of elements + select = "(&(|(groupType=%d)(groupType=%d))(objectClass=group))" % ( + GTYPE_SECURITY_UNIVERSAL_GROUP, + GTYPE_SECURITY_GLOBAL_GROUP) + attributes = ["sAMAccountName", "objectSID"] + unfiltered = self.samdb.search(expression=select, attrs=attributes) + filtered = self.filter_domain(unfiltered) + self.assertTrue(len(filtered) > 4) + + # Sort the expected results by rid + expected = sorted(list(filtered), key=rid) + + # + # Perform EnumDomainGroups with max size greater than the expected + # number of results. Allow for an extra 10 entries + # + max_size = calc_max_size(len(expected) + 10) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(len(expected), num_entries) + check_results(expected, actual.entries) + + # + # Perform EnumDomainGroups with size set to so that it contains + # 4 entries. + # + max_size = calc_max_size(4) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(4, num_entries) + check_results(expected[:4], actual.entries) + + # + # Try calling with resume_handle greater than number of entries + # Should return no results and a resume handle of 0 + max_size = calc_max_size(1) + rh = len(expected) + self.conn.Close(self.handle) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, rh, max_size) + + self.assertEquals(0, num_entries) + self.assertEquals(0, resume_handle) + + # + # Enumerate through the domain groups one element at a time. + # + max_size = calc_max_size(1) + actual = [] + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + while resume_handle: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + # + # Check that the cached results are being returned. + # Obtain a new resume_handle and insert new entries into the + # into the DB + # + actual = [] + max_size = calc_max_size(1) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + extra_dns = self.create_groups([1000, 1002, 1003, 1004]) + while resume_handle: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + self.assertEquals(len(expected), len(actual)) + check_results(expected, actual) + + # + # Perform EnumDomainGroups, we should read the newly added domains + # + max_size = calc_max_size(len(expected) + len(extra_dns) + 10) + (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.assertEquals(len(expected) + len(extra_dns), num_entries) + + # + # Get a new expected result set by querying the database directly + unfiltered01 = self.samdb.search(expression=select, attrs=attributes) + filtered01 = self.filter_domain(unfiltered01) + self.assertTrue(len(filtered01) > len(expected)) + + # Sort the expected results by rid + expected01 = sorted(list(filtered01), key=rid) + + # + # Now check that we read the new entries. + # + check_results(expected01, actual.entries) + + # + # Check that deleted results are handled correctly. + # Obtain a new resume_handle and delete entries from the DB. + # + actual = [] + max_size = calc_max_size(1) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, 0, max_size) + self.delete_dns(extra_dns) + while resume_handle and num_entries: + self.assertEquals(1, num_entries) + actual.append(a.entries[0]) + (resume_handle, a, num_entries) = self.conn.EnumDomainGroups( + self.domain_handle, resume_handle, max_size) + if num_entries: + actual.append(a.entries[0]) + + self.assertEquals(len(expected), len(actual)) + check_results(expected, actual) + + self.delete_dns(dns) |