summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--selftest/knownfail.d/vlv1
-rw-r--r--source4/dsdb/tests/python/vlv.py420
2 files changed, 418 insertions, 3 deletions
diff --git a/selftest/knownfail.d/vlv b/selftest/knownfail.d/vlv
index ee4970cad8b..f187a2ed55e 100644
--- a/selftest/knownfail.d/vlv
+++ b/selftest/knownfail.d/vlv
@@ -1 +1,2 @@
samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr
+samba4.ldap.vlv.python.*__main__.PagedResultsTests.test_paged_cant_change_controls_data
diff --git a/source4/dsdb/tests/python/vlv.py b/source4/dsdb/tests/python/vlv.py
index 04bd89926cf..8550a38e287 100644
--- a/source4/dsdb/tests/python/vlv.py
+++ b/source4/dsdb/tests/python/vlv.py
@@ -95,7 +95,7 @@ def get_cookie(controls, expected_n=None):
raise ValueError("there is no VLV response")
-class VLVTests(samba.tests.TestCase):
+class TestsWithUserOU(samba.tests.TestCase):
def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
name = "%s%d%s" % (prefix, i, suffix)
@@ -148,7 +148,7 @@ class VLVTests(samba.tests.TestCase):
return user
def setUp(self):
- super(VLVTests, self).setUp()
+ super(TestsWithUserOU, self).setUp()
self.ldb = SamDB(host, credentials=creds,
session_info=system_session(lp), lp=lp)
@@ -189,10 +189,13 @@ class VLVTests(samba.tests.TestCase):
self.delicate_keys = ['cn']
def tearDown(self):
- super(VLVTests, self).tearDown()
+ super(TestsWithUserOU, self).tearDown()
if not opts.delete_in_setup:
self.ldb.delete(self.ou, ['tree_delete:1'])
+
+class VLVTests(TestsWithUserOU):
+
def get_full_list(self, attr, include_cn=False):
"""Fetch the whole list sorted on the attribute, using the VLV.
This way you get a VLV cookie."""
@@ -1183,6 +1186,417 @@ class VLVTests(samba.tests.TestCase):
expected_results = [r for r in full_results if r != del_user[attr]]
self.assertEqual(results, expected_results)
+
+class PagedResultsTests(TestsWithUserOU):
+
+ def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
+ attrs=None, ou=None, subtree=False):
+ ou = ou or self.ou
+ if cookie:
+ cookie = ":" + cookie
+ ctrl = "paged_results:1:" + str(page_size) + cookie
+ controls = [ctrl]
+
+ # If extra controls are provided then add them, else default to
+ # sort control on 'cn' attribute
+ if extra_ctrls is not None:
+ controls += extra_ctrls
+ else:
+ sort_ctrl = "server_sort:1:0:cn"
+ controls.append(sort_ctrl)
+
+ kwargs = {}
+ if attrs is not None:
+ kwargs = {"attrs": attrs}
+
+ scope = ldb.SCOPE_ONELEVEL
+ if subtree:
+ scope = ldb.SCOPE_SUBTREE
+
+ res = self.ldb.search(ou,
+ expression=expr,
+ scope=scope,
+ controls=controls,
+ **kwargs)
+ results = [str(r['cn'][0]) for r in res]
+
+ ctrls = [str(c) for c in res.controls if
+ str(c).startswith("paged_results")]
+ assert len(ctrls) == 1, "no paged_results response"
+
+ spl = ctrls[0].rsplit(':', 3)
+ cookie = ""
+ if len(spl) == 3:
+ cookie = spl[-1]
+ return results, cookie
+
+ def test_paged_delete_during_search(self):
+ expr = "(objectClass=*)"
+
+ # Start new search
+ first_page_size = 3
+ results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+ # Run normal search to get expected results
+ unedited_results, _ = self.paged_search(expr,
+ page_size=len(self.users))
+
+ # Get remaining users not returned by the search above
+ unreturned_users = [u for u in self.users if u['cn'] not in results]
+
+ # Delete one of the users
+ del_index = len(self.users)//2
+ del_user = unreturned_users[del_index]
+ self.ldb.delete(del_user['dn'])
+
+ # Run test
+ results, _ = self.paged_search(expr, cookie=cookie,
+ page_size=len(self.users))
+ expected_results = [r for r in unedited_results[first_page_size:]
+ if r != del_user['cn']]
+ self.assertEqual(results, expected_results)
+
+ def test_paged_show_deleted(self):
+ unique = time.strftime("%s", time.gmtime())[-5:]
+ prefix = "show_deleted_test_%s_" % (unique)
+ expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
+ del_ctrl = "show_deleted:1"
+
+ num_users = 10
+ users = []
+ for i in range(num_users):
+ user = self.create_user(i, num_users, prefix=prefix)
+ users.append(user)
+
+ first_user = users[0]
+ self.ldb.delete(first_user['dn'])
+
+ # Start new search
+ first_page_size = 3
+ results, cookie = self.paged_search(expr, page_size=first_page_size,
+ extra_ctrls=[del_ctrl],
+ ou=self.base_dn,
+ subtree=True)
+
+ # Get remaining users not returned by the search above
+ unreturned_users = [u for u in users if u['cn'] not in results]
+
+ # Delete one of the users
+ del_index = len(users)//2
+ del_user = unreturned_users[del_index]
+ self.ldb.delete(del_user['dn'])
+
+ results2, _ = self.paged_search(expr, cookie=cookie,
+ page_size=len(users)*2,
+ extra_ctrls=[del_ctrl],
+ ou=self.base_dn,
+ subtree=True)
+
+ user_cns = {str(u['cn']) for u in users}
+ deleted_cns = {first_user['cn'], del_user['cn']}
+
+ all_results = results + results2
+ normal_results = {r for r in all_results if "DEL:" not in r}
+ self.assertEqual(normal_results, user_cns - deleted_cns)
+
+ # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
+ deleted_results = {r[:r.index('\n')] for r in all_results
+ if "DEL:" in r}
+ self.assertEqual(deleted_results, deleted_cns)
+
+ def test_paged_add_during_search(self):
+ expr = "(objectClass=*)"
+
+ # Start new search
+ first_page_size = 3
+ results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+ unedited_results, _ = self.paged_search(expr,
+ page_size=len(self.users)+1)
+
+ # Get remaining users not returned by the search above
+ unwalked_users = [cn for cn in unedited_results if cn not in results]
+
+ # Add a user in the middle of the sort order
+ middle_index = len(unwalked_users)//2
+ middle_user = unwalked_users[middle_index]
+
+ user = {'cn': middle_user + '_2', "objectclass": "user"}
+ user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
+ self.ldb.add(user)
+
+ results, _ = self.paged_search(expr, cookie=cookie,
+ page_size=len(self.users)+1)
+ expected_results = unwalked_users[:]
+
+ # Uncomment this line to assert that adding worked.
+ # expected_results.insert(middle_index+1, user['cn'])
+
+ self.assertEqual(results, expected_results)
+
+ def test_paged_modify_during_search(self):
+ expr = "(objectClass=*)"
+
+ # Start new search
+ first_page_size = 3
+ results, cookie = self.paged_search(expr, page_size=first_page_size)
+
+ unedited_results, _ = self.paged_search(expr,
+ page_size=len(self.users)+1)
+
+ # Modify user in the middle of the remaining sort order
+ unwalked_users = [cn for cn in unedited_results if cn not in results]
+ middle_index = len(unwalked_users)//2
+ middle_cn = unwalked_users[middle_index]
+
+ # Find user object
+ users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
+ self.assertEqual(len(users_with_middle_cn), 1)
+ middle_user = users_with_middle_cn[0]
+
+ # Rename object
+ edit_cn = "z_" + middle_cn
+ new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
+ self.ldb.rename(middle_user['dn'], new_dn)
+
+ results, _ = self.paged_search(expr, cookie=cookie,
+ page_size=len(self.users)+1)
+ expected_results = unwalked_users[:]
+ expected_results[middle_index] = edit_cn
+ self.assertEqual(results, expected_results)
+
+ def test_paged_modify_object_scope(self):
+ expr = "(objectClass=*)"
+
+ ou2 = "OU=vlvtestou2,%s" % (self.base_dn)
+ try:
+ self.ldb.delete(ou2, ['tree_delete:1'])
+ except ldb.LdbError:
+ pass
+ self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
+
+ # Do a separate, full search to get all results
+ unedited_results, _ = self.paged_search(expr,
+ page_size=len(self.users)+1)
+
+ # Rename before starting a search
+ first_cn = self.users[0]['cn']
+ new_dn = "CN=%s,%s" % (first_cn, ou2)
+ self.ldb.rename(self.users[0]['dn'], new_dn)
+
+ # Start new search under the original OU
+ first_page_size = 3
+ results, cookie = self.paged_search(expr, page_size=first_page_size)
+ self.assertEqual(results, unedited_results[1:1+first_page_size])
+
+ # Get one of the users that is yet to be returned
+ unwalked_users = [cn for cn in unedited_results if cn not in results]
+ middle_index = len(unwalked_users)//2
+ middle_cn = unwalked_users[middle_index]
+
+ # Find user object
+ users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
+ self.assertEqual(len(users_with_middle_cn), 1)
+ middle_user = users_with_middle_cn[0]
+
+ # Rename
+ new_dn = "CN=%s,%s" % (middle_cn, ou2)
+ self.ldb.rename(middle_user['dn'], new_dn)
+
+ results, _ = self.paged_search(expr, cookie=cookie,
+ page_size=len(self.users)+1)
+
+ expected_results = unwalked_users[:]
+
+ # We should really expect that the object renamed into a different
+ # OU should vanish from the results, but turns out Windows does return
+ # the object in this case. Our module matches the Windows behaviour.
+
+ # If behaviour changes, this line inverts the test's expectations to
+ # what you might expect.
+ # del expected_results[middle_index]
+
+ # But still expect the user we removed before the search to be gone
+ del expected_results[0]
+
+ self.assertEqual(results, expected_results)
+
+ def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
+ extra_ctrls=None):
+ try:
+ results, _ = self.paged_search(expr, cookie=cookie,
+ page_size=2,
+ extra_ctrls=extra_ctrls,
+ attrs=attrs)
+ except ldb.LdbError as e:
+ self.assertEqual(e.args[0], err_num)
+ return
+
+ self.fail("No error raised by invalid search")
+
+ def test_paged_changed_expr(self):
+ # Initiate search then use a different expr in subsequent req
+ expr = "(objectClass=*)"
+ results, cookie = self.paged_search(expr, page_size=3)
+ expr = "cn>=a"
+ expected_error_num = 12
+ self.assertPagedSearchRaises(expected_error_num, expr, cookie)
+
+ def test_paged_changed_controls(self):
+ expr = "(objectClass=*)"
+ sort_ctrl = "server_sort:1:0:cn"
+ del_ctrl = "show_deleted:1"
+ expected_error_num = 12
+ ps = 3
+
+ # Initiate search with a sort control then remove in subsequent req
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[sort_ctrl])
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[])
+
+ # Initiate search with no sort control then add one in subsequent req
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[])
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[sort_ctrl])
+
+ # Initiate search with show-deleted control then
+ # remove it in subsequent req
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[del_ctrl])
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[])
+
+ # Initiate normal search then add show-deleted control
+ # in subsequent req
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[])
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[del_ctrl])
+
+ # Changing order of controls shouldn't break the search
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[del_ctrl, sort_ctrl])
+ try:
+ results, cookie = self.paged_search(expr, page_size=ps,
+ extra_ctrls=[sort_ctrl,
+ del_ctrl])
+ except ldb.LdbError as e:
+ self.fail(e)
+
+ def test_paged_cant_change_controls_data(self):
+ # Some defaults for the rest of the tests
+ expr = "(objectClass=*)"
+ sort_ctrl = "server_sort:1:0:cn"
+ expected_error_num = 12
+
+ # Initiate search with sort control then change it in subsequent req
+ results, cookie = self.paged_search(expr, page_size=3,
+ extra_ctrls=[sort_ctrl])
+ changed_sort_ctrl = "server_sort:1:0:roomNumber"
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[changed_sort_ctrl])
+
+ # Initiate search with a control with crit=1, then use crit=0
+ results, cookie = self.paged_search(expr, page_size=3,
+ extra_ctrls=[sort_ctrl])
+ changed_sort_ctrl = "server_sort:0:0:cn"
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, extra_ctrls=[changed_sort_ctrl])
+
+ def test_paged_search_referrals(self):
+ expr = "(objectClass=*)"
+ paged_ctrl = "paged_results:1:5"
+ res = self.ldb.search(self.base_dn,
+ expression=expr,
+ attrs=['cn'],
+ scope=ldb.SCOPE_SUBTREE,
+ controls=[paged_ctrl])
+
+ # Do a paged search walk over the whole database and save a list
+ # of all the referrals returned by each search.
+ referral_lists = []
+
+ while True:
+ referral_lists.append(res.referals)
+
+ ctrls = [str(c) for c in res.controls if
+ str(c).startswith("paged_results")]
+ self.assertEqual(len(ctrls), 1)
+ spl = ctrls[0].rsplit(':')
+ if len(spl) != 3:
+ break
+
+ cookie = spl[-1]
+ res = self.ldb.search(self.base_dn,
+ expression=expr,
+ attrs=['cn'],
+ scope=ldb.SCOPE_SUBTREE,
+ controls=[paged_ctrl + ":" + cookie])
+
+ ref_list = referral_lists[0]
+
+ # Sanity check to make sure the search actually did something
+ self.assertGreater(len(referral_lists), 2)
+
+ # Check the first referral set contains stuff
+ self.assertGreater(len(ref_list), 0)
+
+ # Check the others don't
+ self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
+
+ # Check the entries in the first referral list look like referrals
+ self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
+
+ def test_paged_change_attrs(self):
+ expr = "(objectClass=*)"
+ attrs = ['cn']
+ expected_error_num = 12
+
+ results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
+ results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
+ attrs=attrs)
+
+ changed_attrs = attrs + ['roomNumber']
+ self.assertPagedSearchRaises(expected_error_num, expr,
+ cookie, attrs=changed_attrs,
+ extra_ctrls=[])
+
+ def test_paged_search_lockstep(self):
+ expr = "(objectClass=*)"
+ ps = 3
+
+ all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
+
+ # Run two different but overlapping paged searches simultaneously.
+ set_1_index = int((len(all_results))//3)
+ set_2_index = int((2*len(all_results))//3)
+ set_1 = all_results[set_1_index:]
+ set_2 = all_results[:set_2_index+1]
+ set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
+ set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
+
+ results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
+ self.assertEqual(results, set_1[:ps])
+ results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
+ self.assertEqual(results, set_2[:ps])
+
+ results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
+ page_size=ps)
+ self.assertEqual(results, set_1[ps:ps*2])
+ results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
+ page_size=ps)
+ self.assertEqual(results, set_2[ps:ps*2])
+
+ results, _ = self.paged_search(set_1_expr, cookie=cookie1,
+ page_size=len(self.users))
+ self.assertEqual(results, set_1[ps*2:])
+ results, _ = self.paged_search(set_2_expr, cookie=cookie2,
+ page_size=len(self.users))
+ self.assertEqual(results, set_2[ps*2:])
+
+
if "://" not in host:
if os.path.isfile(host):
host = "tdb://%s" % host