summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorGary Lockyer <gary@catalyst.net.nz>2018-10-12 11:21:10 +1300
committerAndrew Bartlett <abartlet@samba.org>2018-11-20 22:14:17 +0100
commit7dd7800a88e6ece0606d6d94718318e2dc067d58 (patch)
tree87bd47954ed3cd0c95198b4d9db21d8b89371f4e /python
parentfc1a16ff61ac57079d7e31fc2266b35b2fdeaae0 (diff)
downloadsamba-7dd7800a88e6ece0606d6d94718318e2dc067d58.tar.gz
test samr: Extra tests for samr_EnumDomainGroups
Add extra tests to test the content returned by samr_EnumDomainGroups, and tests for the result caching added in the following commit. Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rw-r--r--python/samba/tests/dcerpc/sam.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py
index f709ba48950..11ed9b9287f 100644
--- a/python/samba/tests/dcerpc/sam.py
+++ b/python/samba/tests/dcerpc/sam.py
@@ -32,6 +32,7 @@ from samba.dsdb import (
GTYPE_SECURITY_UNIVERSAL_GROUP,
GTYPE_SECURITY_GLOBAL_GROUP)
from samba import generate_random_password
+from samba.ndr import ndr_unpack
import os
@@ -40,6 +41,24 @@ def toArray(handle, array, num_entries):
return [(entry.idx, entry.name) for entry in array.entries[:num_entries]]
+# Extract the rid from an ldb message, assumes that the message has a
+# objectSID attribute
+#
+def rid(msg):
+ sid = ndr_unpack(security.dom_sid, msg["objectSID"][0])
+ (_, rid) = sid.split()
+ return rid
+
+
+# Calculate the request size for EnumDomainUsers and EnumDomainGroups calls
+# to hold the specified number of entries.
+# We use the w2k3 element size value of 54, code under test
+# rounds this up i.e. (1+(max_size/SAMR_ENUM_USERS_MULTIPLIER))
+#
+def calc_max_size(num_entries):
+ return (num_entries - 1) * 54
+
+
class SamrTests(RpcInterfaceTestCase):
def setUp(self):
@@ -72,6 +91,18 @@ class SamrTests(RpcInterfaceTestCase):
self.domain_handle = self.conn.OpenDomain(
self.handle, security.SEC_FLAG_MAXIMUM_ALLOWED, self.domain_sid)
+ # Filter a list of records, removing those that are not part of the
+ # current domain.
+ #
+ def filter_domain(self, unfiltered):
+ def sid(msg):
+ sid = ndr_unpack(security.dom_sid, msg["objectSID"][0])
+ (x, _) = sid.split()
+ return x
+
+ dom_sid = security.dom_sid(self.samdb.get_domain_sid())
+ return [x for x in unfiltered if sid(x) == dom_sid]
+
def test_connect5(self):
(level, info, handle) =\
self.conn.Connect5(None, 0, 1, samr.ConnectInfo1())
@@ -435,3 +466,141 @@ class SamrTests(RpcInterfaceTestCase):
5, check_results, select, attributes, self.create_groups)
self.delete_dns(dns)
+
+ def test_EnumDomainGroups(self):
+ def check_results(expected, actual):
+ for (e, a) in zip(expected, actual):
+ self.assertTrue(isinstance(a, samr.SamEntry))
+ self.assertEquals(
+ str(e["sAMAccountName"]), str(a.name.string))
+
+ # Create four groups
+ # to ensure that we have the minimum needed for the tests.
+ dns = self.create_groups([1, 2, 3, 4])
+
+ #
+ # Get the expected results by querying the samdb database directly.
+ # We do this rather than use a list of expected results as this runs
+ # with other tests so we do not have a known fixed list of elements
+ select = "(&(|(groupType=%d)(groupType=%d))(objectClass=group))" % (
+ GTYPE_SECURITY_UNIVERSAL_GROUP,
+ GTYPE_SECURITY_GLOBAL_GROUP)
+ attributes = ["sAMAccountName", "objectSID"]
+ unfiltered = self.samdb.search(expression=select, attrs=attributes)
+ filtered = self.filter_domain(unfiltered)
+ self.assertTrue(len(filtered) > 4)
+
+ # Sort the expected results by rid
+ expected = sorted(list(filtered), key=rid)
+
+ #
+ # Perform EnumDomainGroups with max size greater than the expected
+ # number of results. Allow for an extra 10 entries
+ #
+ max_size = calc_max_size(len(expected) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ self.assertEquals(len(expected), num_entries)
+ check_results(expected, actual.entries)
+
+ #
+ # Perform EnumDomainGroups with size set to so that it contains
+ # 4 entries.
+ #
+ max_size = calc_max_size(4)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ self.assertEquals(4, num_entries)
+ check_results(expected[:4], actual.entries)
+
+ #
+ # Try calling with resume_handle greater than number of entries
+ # Should return no results and a resume handle of 0
+ max_size = calc_max_size(1)
+ rh = len(expected)
+ self.conn.Close(self.handle)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, rh, max_size)
+
+ self.assertEquals(0, num_entries)
+ self.assertEquals(0, resume_handle)
+
+ #
+ # Enumerate through the domain groups one element at a time.
+ #
+ max_size = calc_max_size(1)
+ actual = []
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, resume_handle, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ #
+ # Check that the cached results are being returned.
+ # Obtain a new resume_handle and insert new entries into the
+ # into the DB
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ extra_dns = self.create_groups([1000, 1002, 1003, 1004])
+ while resume_handle:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, resume_handle, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ #
+ # Perform EnumDomainGroups, we should read the newly added domains
+ #
+ max_size = calc_max_size(len(expected) + len(extra_dns) + 10)
+ (resume_handle, actual, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ self.assertEquals(len(expected) + len(extra_dns), num_entries)
+
+ #
+ # Get a new expected result set by querying the database directly
+ unfiltered01 = self.samdb.search(expression=select, attrs=attributes)
+ filtered01 = self.filter_domain(unfiltered01)
+ self.assertTrue(len(filtered01) > len(expected))
+
+ # Sort the expected results by rid
+ expected01 = sorted(list(filtered01), key=rid)
+
+ #
+ # Now check that we read the new entries.
+ #
+ check_results(expected01, actual.entries)
+
+ #
+ # Check that deleted results are handled correctly.
+ # Obtain a new resume_handle and delete entries from the DB.
+ #
+ actual = []
+ max_size = calc_max_size(1)
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, 0, max_size)
+ self.delete_dns(extra_dns)
+ while resume_handle and num_entries:
+ self.assertEquals(1, num_entries)
+ actual.append(a.entries[0])
+ (resume_handle, a, num_entries) = self.conn.EnumDomainGroups(
+ self.domain_handle, resume_handle, max_size)
+ if num_entries:
+ actual.append(a.entries[0])
+
+ self.assertEquals(len(expected), len(actual))
+ check_results(expected, actual)
+
+ self.delete_dns(dns)