diff options
-rw-r--r-- | selftest/knownfail.d/asq | 1 | ||||
-rw-r--r-- | source4/dsdb/tests/python/asq.py | 171 | ||||
-rwxr-xr-x | source4/selftest/tests.py | 1 |
3 files changed, 173 insertions, 0 deletions
diff --git a/selftest/knownfail.d/asq b/selftest/knownfail.d/asq new file mode 100644 index 00000000000..eb0e3e0aba1 --- /dev/null +++ b/selftest/knownfail.d/asq @@ -0,0 +1 @@ +samba4.asq.python\(ad_dc_default\).__main__.ASQLDAPTest.test_asq_paged
\ No newline at end of file diff --git a/source4/dsdb/tests/python/asq.py b/source4/dsdb/tests/python/asq.py new file mode 100644 index 00000000000..a32c9f40cd3 --- /dev/null +++ b/source4/dsdb/tests/python/asq.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +# +# Test ASQ LDAP control behaviour in Samba +# Copyright (C) Andrew Bartlett 2019-2020 +# +# Based on Unit tests for the notification control +# Copyright (C) Stefan Metzmacher 2016 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import optparse +import sys +import os +import random + +sys.path.insert(0, "bin/python") +import samba +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from samba.auth import system_session +from samba import ldb +from samba.samdb import SamDB +from samba.ndr import ndr_unpack +from samba import gensec +from samba.credentials import Credentials +import samba.tests + +from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError +from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM +from ldb import Message + +parser = optparse.OptionParser("large_ldap.py [options] <host>") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +url = args[0] + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) + + +class ASQLDAPTest(samba.tests.TestCase): + + def setUp(self): + super(ASQLDAPTest, self).setUp() + self.ldb = samba.Ldb(url, credentials=creds, session_info=system_session(lp), lp=lp) + self.base_dn = self.ldb.get_default_basedn() + self.NAME_ASQ="asq_" + format(random.randint(0, 99999), "05") + self.OU_NAME_ASQ= self.NAME_ASQ + "_ou" + self.ou_dn = ldb.Dn(self.ldb, "ou=" + self.OU_NAME_ASQ + "," + str(self.base_dn)) + + samba.tests.delete_force(self.ldb, self.ou_dn, + controls=['tree_delete:1']) + + self.ldb.add({ + "dn": self.ou_dn, + "objectclass": "organizationalUnit", + "ou": self.OU_NAME_ASQ}) + + self.members = [] + self.members2 = [] + + for x in range(20): + name = self.NAME_ASQ + "_" + str(x) + dn = ldb.Dn(self.ldb, + "cn=" + name + "," + str(self.ou_dn)) + self.members.append(dn) + self.ldb.add({ + "dn": dn, + "objectclass": "group"}) + + for x in range(20): + name = self.NAME_ASQ + "_" + str(x + 20) + dn = ldb.Dn(self.ldb, + "cn=" + name + "," + str(self.ou_dn)) + self.members2.append(dn) + self.ldb.add({ + "dn": dn, + "objectclass": "group", + "member": [str(x) for x in self.members]}) + + name = self.NAME_ASQ + "_" + str(x + 40) + self.top_dn = ldb.Dn(self.ldb, + "cn=" + name + "," + str(self.ou_dn)) + self.ldb.add({ + "dn": self.top_dn, + "objectclass": "group", + "member": [str(x) for x in self.members2]}) + + def tearDown(self): + samba.tests.delete_force(self.ldb, self.ou_dn, + controls=['tree_delete:1']) + + def test_asq(self): + """Testing ASQ behaviour. + + ASQ is very strange, it turns a BASE search into a search for + all the objects pointed to by the specified attribute, + returning multiple entries! + + """ + + msgs = self.ldb.search(base=self.top_dn, + scope=ldb.SCOPE_BASE, + attrs=["objectGUID", "cn", "member"], + controls=["asq:1:member"]) + + self.assertEqual(len(msgs), 20) + + for msg in msgs: + self.assertNotEqual(msg.dn, self.top_dn) + self.assertIn(msg.dn, self.members2) + for group in msg["member"]: + self.assertIn(ldb.Dn(self.ldb, str(group)), + self.members) + + def test_asq_paged(self): + """Testing ASQ behaviour with paged_results set. + + ASQ is very strange, it turns a BASE search into a search for + all the objects pointed to by the specified attribute, + returning multiple entries! + + """ + + msgs = self.ldb.search(base=self.top_dn, + scope=ldb.SCOPE_BASE, + attrs=["objectGUID", "cn", "member"], + controls=["asq:1:member", + "paged_results:1:1024"]) + + self.assertEqual(len(msgs), 20) + + for msg in msgs: + self.assertNotEqual(msg.dn, self.top_dn) + self.assertIn(msg.dn, self.members2) + for group in msg["member"]: + self.assertIn(ldb.Dn(self.ldb, str(group)), + self.members) + +if "://" not in url: + if os.path.isfile(url): + url = "tdb://%s" % url + else: + url = "ldap://%s" % url + +TestProgram(module=__name__, opts=subunitopts) diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index ae2b10ae659..52db18a872b 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -885,6 +885,7 @@ plantestsuite_loadlist("samba4.tokengroups.krb5.python(ad_dc_default)", "ad_dc_d plantestsuite_loadlist("samba4.tokengroups.ntlm.python(ad_dc_default)", "ad_dc_default:local", [python, os.path.join(DSDB_PYTEST_DIR, "token_group.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '-k', 'no', '$LOADLIST', '$LISTOPT']) plantestsuite("samba4.sam.python(fl2008r2dc)", "fl2008r2dc", [python, os.path.join(DSDB_PYTEST_DIR, "sam.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN']) plantestsuite("samba4.sam.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(DSDB_PYTEST_DIR, "sam.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN']) +plantestsuite("samba4.asq.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(DSDB_PYTEST_DIR, "asq.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN']) plantestsuite("samba4.user_account_control.python(ad_dc_default)", "ad_dc_default", [python, os.path.join(DSDB_PYTEST_DIR, "user_account_control.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN']) for env in ['ad_dc_default:local', 'schema_dc:local']: |