diff options
Diffstat (limited to 'source4/dsdb/tests/python/vlv.py')
-rw-r--r-- | source4/dsdb/tests/python/vlv.py | 184 |
1 files changed, 124 insertions, 60 deletions
diff --git a/source4/dsdb/tests/python/vlv.py b/source4/dsdb/tests/python/vlv.py index bc07a53d575..acaf64d0faa 100644 --- a/source4/dsdb/tests/python/vlv.py +++ b/source4/dsdb/tests/python/vlv.py @@ -152,7 +152,7 @@ class TestsWithUserOU(samba.tests.TestCase): super(TestsWithUserOU, self).setUp() self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp) - + self.ldb_ro = self.ldb self.base_dn = self.ldb.domain_dn() self.ou = "ou=vlv,%s" % self.base_dn if opts.delete_in_setup: @@ -195,8 +195,60 @@ class TestsWithUserOU(samba.tests.TestCase): self.ldb.delete(self.ou, ['tree_delete:1']) -class VLVTests(TestsWithUserOU): +class VLVTestsBase(TestsWithUserOU): + + # Run a vlv search and return important fields of the response control + def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1): + sort_ctrl = "server_sort:1:0:%s" % attr + ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset) + if cookie: + ctrl += ":" + cookie + + res = self.ldb_ro.search(self.ou, + expression=expr, + scope=ldb.SCOPE_ONELEVEL, + attrs=[attr], + controls=[ctrl, sort_ctrl]) + results = [str(x[attr][0]) for x in res] + + ctrls = [str(c) for c in res.controls if + str(c).startswith('vlv')] + self.assertEqual(len(ctrls), 1) + + spl = ctrls[0].rsplit(':') + cookie = "" + if len(spl) == 6: + cookie = spl[-1] + + return results, cookie + + +class VLVTestsRO(VLVTestsBase): + def test_vlv_simple_double_run(self): + """Do the simplest possible VLV query to confirm if VLV + works at all. Useful for showing VLV as a whole works + on Global Catalog (for example)""" + attr = 'roomNumber' + expr = "(objectclass=user)" + + # Start new search + full_results, cookie = self.vlv_search(attr, expr, + after_count=len(self.users)) + + results, cookie = self.vlv_search(attr, expr, cookie=cookie, + after_count=len(self.users)) + expected_results = full_results + self.assertEqual(results, expected_results) + + +class VLVTestsGC(VLVTestsRO): + def setUp(self): + super(VLVTestsRO, self).setUp() + self.ldb_ro = SamDB(host + ":3268", credentials=creds, + session_info=system_session(lp), lp=lp) + +class VLVTests(VLVTestsBase): def get_full_list(self, attr, include_cn=False): """Fetch the whole list sorted on the attribute, using the VLV. This way you get a VLV cookie.""" @@ -1077,31 +1129,6 @@ class VLVTests(TestsWithUserOU): controls=[sort_control, "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]]) - # Run a vlv search and return important fields of the response control - def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1): - sort_ctrl = "server_sort:1:0:%s" % attr - ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset) - if cookie: - ctrl += ":" + cookie - - res = self.ldb.search(self.ou, - expression=expr, - scope=ldb.SCOPE_ONELEVEL, - attrs=[attr], - controls=[ctrl, sort_ctrl]) - results = [str(x[attr][0]) for x in res] - - ctrls = [str(c) for c in res.controls if - str(c).startswith('vlv')] - self.assertEqual(len(ctrls), 1) - - spl = ctrls[0].rsplit(':') - cookie = "" - if len(spl) == 6: - cookie = spl[-1] - - return results, cookie - def test_vlv_modify_during_view(self): attr = 'roomNumber' expr = "(objectclass=user)" @@ -1214,11 +1241,11 @@ class PagedResultsTests(TestsWithUserOU): if subtree: scope = ldb.SCOPE_SUBTREE - res = self.ldb.search(ou, - expression=expr, - scope=scope, - controls=controls, - **kwargs) + res = self.ldb_ro.search(ou, + expression=expr, + scope=scope, + controls=controls, + **kwargs) results = [str(r['cn'][0]) for r in res] ctrls = [str(c) for c in res.controls if @@ -1231,6 +1258,53 @@ class PagedResultsTests(TestsWithUserOU): cookie = spl[-1] return results, cookie + +class PagedResultsTestsRO(PagedResultsTests): + + def test_paged_search_lockstep(self): + expr = "(objectClass=*)" + ps = 3 + + all_results, _ = self.paged_search(expr, page_size=len(self.users)+1) + + # Run two different but overlapping paged searches simultaneously. + set_1_index = int((len(all_results))//3) + set_2_index = int((2*len(all_results))//3) + set_1 = all_results[set_1_index:] + set_2 = all_results[:set_2_index+1] + set_1_expr = "(cn>=%s)" % (all_results[set_1_index]) + set_2_expr = "(cn<=%s)" % (all_results[set_2_index]) + + results, cookie1 = self.paged_search(set_1_expr, page_size=ps) + self.assertEqual(results, set_1[:ps]) + results, cookie2 = self.paged_search(set_2_expr, page_size=ps) + self.assertEqual(results, set_2[:ps]) + + results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1, + page_size=ps) + self.assertEqual(results, set_1[ps:ps*2]) + results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2, + page_size=ps) + self.assertEqual(results, set_2[ps:ps*2]) + + results, _ = self.paged_search(set_1_expr, cookie=cookie1, + page_size=len(self.users)) + self.assertEqual(results, set_1[ps*2:]) + results, _ = self.paged_search(set_2_expr, cookie=cookie2, + page_size=len(self.users)) + self.assertEqual(results, set_2[ps*2:]) + + +class PagedResultsTestsGC(PagedResultsTestsRO): + + def setUp(self): + super(PagedResultsTestsRO, self).setUp() + self.ldb_ro = SamDB(host + ":3268", credentials=creds, + session_info=system_session(lp), lp=lp) + + +class PagedResultsTestsRW(PagedResultsTests): + def test_paged_delete_during_search(self): expr = "(objectClass=*)" @@ -1611,38 +1685,28 @@ class PagedResultsTests(TestsWithUserOU): cookie, attrs=changed_attrs, extra_ctrls=[]) - def test_paged_search_lockstep(self): - expr = "(objectClass=*)" - ps = 3 + def test_vlv_paged(self): + """Testing behaviour with VLV and paged_results set. - all_results, _ = self.paged_search(expr, page_size=len(self.users)+1) + A strange combination, certainly - # Run two different but overlapping paged searches simultaneously. - set_1_index = int((len(all_results))//3) - set_2_index = int((2*len(all_results))//3) - set_1 = all_results[set_1_index:] - set_2 = all_results[:set_2_index+1] - set_1_expr = "(cn>=%s)" % (all_results[set_1_index]) - set_2_expr = "(cn<=%s)" % (all_results[set_2_index]) + Thankfully combining both of these gives + unavailable-critical-extension against Windows 1709 - results, cookie1 = self.paged_search(set_1_expr, page_size=ps) - self.assertEqual(results, set_1[:ps]) - results, cookie2 = self.paged_search(set_2_expr, page_size=ps) - self.assertEqual(results, set_2[:ps]) - - results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1, - page_size=ps) - self.assertEqual(results, set_1[ps:ps*2]) - results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2, - page_size=ps) - self.assertEqual(results, set_2[ps:ps*2]) + """ + sort_control = "server_sort:1:0:cn" - results, _ = self.paged_search(set_1_expr, cookie=cookie1, - page_size=len(self.users)) - self.assertEqual(results, set_1[ps*2:]) - results, _ = self.paged_search(set_2_expr, cookie=cookie2, - page_size=len(self.users)) - self.assertEqual(results, set_2[ps*2:]) + try: + msgs = self.ldb.search(base=self.base_dn, + scope=ldb.SCOPE_SUBTREE, + attrs=["objectGUID", "cn", "member"], + controls=["vlv:1:20:20:11:0", + sort_control, + "paged_results:1:1024"]) + self.fail("should have failed with LDAP_UNAVAILABLE_CRITICAL_EXTENSION") + except ldb.LdbError as e: + (enum, estr) = e.args + self.assertEqual(enum, ldb.ERR_UNSUPPORTED_CRITICAL_EXTENSION) if "://" not in host: |