summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGary Lockyer <gary@catalyst.net.nz>2018-10-18 13:53:55 +1300
committerAndrew Bartlett <abartlet@samba.org>2018-11-20 22:14:17 +0100
commitfa3ea1cfc181dc9119c5fb0c6a1a12a7d31bfac2 (patch)
tree4de890a532202471b9cea652b4fa7e23e010ba02 /python
parent8d8303b0aecb472de33f7dbd87c80e939e98034a (diff)
downloadsamba-fa3ea1cfc181dc9119c5fb0c6a1a12a7d31bfac2.tar.gz
tests samr: Extra tests for samr_EnumDomainUserss
Add extra tests to test the content returned by samr_EnumDomainUsers, and tests for the result caching added in the following commit. Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/dcerpc/sam.py144
1 files changed, 144 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py
index 11ed9b9287f..ab710861383 100644
--- a/python/samba/tests/dcerpc/sam.py
+++ b/python/samba/tests/dcerpc/sam.py
@@ -604,3 +604,147 @@ class SamrTests(RpcInterfaceTestCase):
check_results(expected, actual)
self.delete_dns(dns)
+
+ def test_EnumDomainUsers(self):
+ def check_results(expected, actual):
+ for (e, a) in zip(expected, actual):
+ self.assertTrue(isinstance(a, samr.SamEntry))
+ self.assertEquals(
+ str(e["sAMAccountName"]), str(a.name.string))
+
+ # Create four users
+ # to ensure that we have the minimum needed for the tests.
+ dns = self.create_users([1, 2, 3, 4])
+
+ #
+ # Get the expected results by querying the samdb database directly.
+ # We do this rather than use a list of expected results as this runs
+ # with other tests so we do not have a known fixed list of elements
+ select = "(objectClass=user)"
+ attributes = ["sAMAccountName", "objectSID", "userAccountConrol"]
+ unfiltered = self.samdb.search(expression=select, attrs=attributes)
+ filtered = self.filter_domain(unfiltered)
+ self.assertTrue(len(filtered) > 4)
+
+ # Sort the expected results by rid
+ expected = sorted(list(filtered), key=rid)
+
+ #
+ # Perform EnumDomainUsers with max_size greater than required for the
+ # expected number of results. We should get all the results.
+ #
+ max_size = calc_max_size(len(expected) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(len(expected), num_entries)
+ check_results(expected, actual.entries)
+
+ #
+ # Perform EnumDomainUsers with size set to so that it contains
+ # 4 entries.
+ max_size = calc_max_size(4)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(4, num_entries)
+ check_results(expected[:4], actual.entries)
+
+ #
+ # Try calling with resume_handle greater than number of entries
+ # Should return no results and a resume handle of 0
+ rh = len(expected)
+ max_size = calc_max_size(1)
+ self.conn.Close(self.handle)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, rh, 0, max_size)
+
+ self.assertEquals(0, num_entries)
+ self.assertEquals(0, resume_handle)
+
+ #
+ # Enumerate through the domain users one element at a time.
+ # We should get all the results.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ #
+ # Check that the cached results are being returned.
+ # Obtain a new resume_handle and insert new entries into the
+ # into the DB. As the entries were added after the results were cached
+ # they should not show up in the returned results.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ extra_dns = self.create_users([1000, 1002, 1003, 1004])
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ #
+ # Perform EnumDomainUsers, we should read the newly added groups
+ # As resume_handle is zero, the results will be read from disk.
+ #
+ max_size = calc_max_size(len(expected) + len(extra_dns) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.assertEquals(len(expected) + len(extra_dns), num_entries)
+
+ #
+ # Get a new expected result set by querying the database directly
+ unfiltered01 = self.samdb.search(expression=select, attrs=attributes)
+ filtered01 = self.filter_domain(unfiltered01)
+ self.assertTrue(len(filtered01) > len(expected))
+
+ # Sort the expected results by rid
+ expected01 = sorted(list(filtered01), key=rid)
+
+ #
+ # Now check that we read the new entries.
+ #
+ self.assertEquals(len(expected01), num_entries)
+ check_results(expected01, actual.entries)
+
+ #
+ # Check that deleted results are handled correctly.
+ # Obtain a new resume_handle and delete entries from the DB.
+ # We will not see the deleted entries in the result set, as details
+ # need to be read from disk. Only the object GUID's are cached.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, 0, 0, max_size)
+ self.delete_dns(extra_dns)
+ while resume_handle and num_entries:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainUsers(
+ self.domain_handle, resume_handle, 0, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ self.delete_dns(dns)