diff options
author | Douglas Bagnall <douglas.bagnall@catalyst.net.nz> | 2016-06-30 16:35:08 +1200 |
---|---|---|
committer | Garming Sam <garming@samba.org> | 2016-07-15 10:01:28 +0200 |
commit | 4cb565bc8777fffd3fa9c34117dba1de57c72002 (patch) | |
tree | 30de1c5ab8a68e418ab2ef67b11be65c9896f1ee /source4 | |
parent | 5ce969d0c70afc1f14a9b223edbaec7a847c64de (diff) | |
download | samba-4cb565bc8777fffd3fa9c34117dba1de57c72002.tar.gz |
dsdb tests: add linked attribute tests
Note that this test will not work properly across ldap as the
marked-deleted linked attributes will not appear.
Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'source4')
-rw-r--r-- | source4/dsdb/tests/python/linked_attributes.py | 363 | ||||
-rwxr-xr-x | source4/selftest/tests.py | 1 |
2 files changed, 364 insertions, 0 deletions
diff --git a/source4/dsdb/tests/python/linked_attributes.py b/source4/dsdb/tests/python/linked_attributes.py new file mode 100644 index 00000000000..c7afad4d862 --- /dev/null +++ b/source4/dsdb/tests/python/linked_attributes.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Originally based on ./sam.py +import optparse +import sys +import os +import base64 +import random +import re + +sys.path.insert(0, "bin/python") +import samba +from samba.tests.subunitrun import SubunitOptions, TestProgram + +import samba.getopt as options + +from samba.auth import system_session +import ldb +from samba.samdb import SamDB +from samba.dcerpc import misc + +import time + +parser = optparse.OptionParser("linked_attributes.py [options] <host>") +sambaopts = options.SambaOptions(parser) +parser.add_option_group(sambaopts) +parser.add_option_group(options.VersionOptions(parser)) +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +subunitopts = SubunitOptions(parser) +parser.add_option_group(subunitopts) + +parser.add_option('--delete-in-setup', action='store_true', + help="cleanup in setup") + +parser.add_option('--no-cleanup', action='store_true', + help="don't cleanup in teardown") + +parser.add_option('--no-reveal-internals', action='store_true', + help="Only use windows compatible ldap controls") + +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +host = args[0] + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) + + +class LATestException(Exception): + pass + + +class LATests(samba.tests.TestCase): + + def setUp(self): + super(LATests, self).setUp() + self.samdb = SamDB(host, credentials=creds, + session_info=system_session(lp), lp=lp) + + self.base_dn = self.samdb.domain_dn() + self.ou = "OU=la,%s" % self.base_dn + if opts.delete_in_setup: + try: + self.samdb.delete(self.ou, ['tree_delete:1']) + except ldb.LdbError, e: + print "tried deleting %s, got error %s" % (self.ou, e) + self.samdb.add({'objectclass': 'organizationalUnit', + 'dn': self.ou}) + + def tearDown(self): + super(LATests, self).tearDown() + if not opts.no_cleanup: + self.samdb.delete(self.ou, ['tree_delete:1']) + + def delete_user(self, user): + self.samdb.delete(user['dn']) + del self.users[self.users.index(user)] + + def add_object(self, cn, objectclass): + dn = "CN=%s,%s" % (cn, self.ou) + self.samdb.add({'cn': cn, + 'objectclass': objectclass, + 'dn': dn}) + + return dn + + def add_objects(self, n, objectclass, prefix=None): + if prefix is None: + prefix = objectclass + dns = [] + for i in range(n): + dns.append(self.add_object("%s%d" % (prefix, i + 1), + objectclass)) + return dns + + def add_linked_attribute(self, src, dest, attr='member'): + m = ldb.Message() + m.dn = ldb.Dn(self.samdb, src) + m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr) + self.samdb.modify(m) + + def remove_linked_attribute(self, src, dest, attr='member'): + m = ldb.Message() + m.dn = ldb.Dn(self.samdb, src) + m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr) + self.samdb.modify(m) + + def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE, + **controls): + if opts.no_reveal_internals: + if 'reveal_internals' in controls: + del controls['reveal_internals'] + + controls = ['%s:%d' % (k, int(v)) for k, v in controls.items()] + + res = self.samdb.search(obj, + scope=scope, + attrs=[attr], + controls=controls) + return res + + def assert_links(self, obj, expected, attr, sorted=False, msg='', + **kwargs): + res = self.attr_search(obj, expected, attr, **kwargs) + + if len(expected) == 0: + if attr in res[0]: + self.fail("found attr '%s' in %s" % (attr, res[0])) + return + + try: + results = list([x[attr] for x in res][0]) + except KeyError: + self.fail("missing attr '%s' on %s" % (attr, obj)) + + if sorted == False: + results = set(results) + expected = set(expected) + + if expected != results: + print msg + print "expected %s" % expected + print "received %s" % results + + self.assertEqual(results, expected) + + def assert_back_links(self, obj, expected, attr='memberOf', **kwargs): + self.assert_links(obj, expected, attr=attr, + msg='back links do not match', **kwargs) + + def assert_forward_links(self, obj, expected, attr='member', **kwargs): + self.assert_links(obj, expected, attr=attr, + msg='forward links do not match', **kwargs) + + def get_object_guid(self, dn): + res = self.samdb.search(dn, + scope=ldb.SCOPE_BASE, + attrs=['objectGUID']) + return str(misc.GUID(res[0]['objectGUID'][0])) + + def _test_la_backlinks(self, reveal=False): + tag = 'backlinks' + kwargs = {} + if reveal: + tag += '_reveal' + kwargs = {'reveal_internals': 0} + + u1, u2 = self.add_objects(2, 'user', 'u_%s' % tag) + g1, g2 = self.add_objects(2, 'group', 'g_%s' % tag) + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.assert_back_links(u1, [g1, g2], **kwargs) + self.assert_back_links(u2, [g2], **kwargs) + + def test_la_backlinks(self): + self._test_la_backlinks() + + def test_la_backlinks_reveal(self): + if opts.no_reveal_internals: + print 'skipping because --no-reveal-internals' + return + self._test_la_backlinks(True) + + def _test_la_backlinks_delete_group(self, reveal=False): + tag = 'del_group' + kwargs = {} + if reveal: + tag += '_reveal' + kwargs = {'reveal_internals': 0} + + u1, u2 = self.add_objects(2, 'user', 'u_' + tag) + g1, g2 = self.add_objects(2, 'group', 'g_' + tag) + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.samdb.delete(g2, ['tree_delete:1']) + + self.assert_back_links(u1, [g1], **kwargs) + self.assert_back_links(u2, set(), **kwargs) + + def test_la_backlinks_delete_group(self): + self._test_la_backlinks_delete_group() + + def test_la_backlinks_delete_group_reveal(self): + if opts.no_reveal_internals: + print 'skipping because --no-reveal-internals' + return + self._test_la_backlinks_delete_group(True) + + def test_links_all_delete_group(self): + u1, u2 = self.add_objects(2, 'user', 'u_all_del_group') + g1, g2 = self.add_objects(2, 'group', 'g_all_del_group') + g2guid = self.get_object_guid(g2) + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.samdb.delete(g2) + self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1, + show_deactivated_link=0) + self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1, + show_deactivated_link=0) + self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1, + show_deactivated_link=0) + self.assert_forward_links('<GUID=%s>' % g2guid, + [], show_deleted=1, show_recycled=1, + show_deactivated_link=0) + + def test_links_all_delete_group_reveal(self): + u1, u2 = self.add_objects(2, 'user', 'u_all_del_group_reveal') + g1, g2 = self.add_objects(2, 'group', 'g_all_del_group_reveal') + g2guid = self.get_object_guid(g2) + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.samdb.delete(g2) + self.assert_back_links(u1, [g1], show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + self.assert_back_links(u2, set(), show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + self.assert_forward_links(g1, [u1], show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + self.assert_forward_links('<GUID=%s>' % g2guid, + [], show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + + def test_la_links_delete_link(self): + u1, u2 = self.add_objects(2, 'user', 'u_del_link') + g1, g2 = self.add_objects(2, 'group', 'g_del_link') + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.remove_linked_attribute(g2, u1) + + self.assert_forward_links(g1, [u1]) + self.assert_forward_links(g2, [u2]) + + self.add_linked_attribute(g2, u1) + self.assert_forward_links(g2, [u1, u2]) + self.remove_linked_attribute(g2, u2) + self.assert_forward_links(g2, [u1]) + self.remove_linked_attribute(g2, u1) + self.assert_forward_links(g2, []) + + def test_la_links_delete_link_reveal(self): + u1, u2 = self.add_objects(2, 'user', 'u_del_link_reveal') + g1, g2 = self.add_objects(2, 'group', 'g_del_link_reveal') + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.remove_linked_attribute(g2, u1) + + self.assert_forward_links(g2, [u1, u2], show_deleted=1, + show_recycled=1, + show_deactivated_link=0, + reveal_internals=0 + ) + + def test_la_links_delete_user(self): + u1, u2 = self.add_objects(2, 'user', 'u_del_user') + g1, g2 = self.add_objects(2, 'group', 'g_del_user') + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.samdb.delete(u1) + + self.assert_forward_links(g1, []) + self.assert_forward_links(g2, [u2]) + + def test_la_links_delete_user_reveal(self): + u1, u2 = self.add_objects(2, 'user', 'u_del_user_reveal') + g1, g2 = self.add_objects(2, 'group', 'g_del_user_reveal') + + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + + self.samdb.delete(u1) + + self.assert_forward_links(g2, [u2], + show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + self.assert_forward_links(g1, [], + show_deleted=1, show_recycled=1, + show_deactivated_link=0, + reveal_internals=0) + + def _test_la_links_sort_order(self): + u1, u2, u3 = self.add_objects(3, 'user', 'u_sort_order') + g1, g2, g3 = self.add_objects(3, 'group', 'g_sort_order') + + # Add these in a haphazard order + self.add_linked_attribute(g2, u3) + self.add_linked_attribute(g3, u2) + self.add_linked_attribute(g1, u3) + self.add_linked_attribute(g1, u1) + self.add_linked_attribute(g2, u1) + self.add_linked_attribute(g2, u2) + self.add_linked_attribute(g3, u3) + self.add_linked_attribute(g3, u1) + + self.assert_forward_links(g1, [u3, u1], sorted=True) + self.assert_forward_links(g2, [u3, u2, u1], sorted=True) + self.assert_forward_links(g3, [u3, u2, u1], sorted=True) + + self.assert_back_links(u1, [g3, g2, g1], sorted=True) + self.assert_back_links(u2, [g3, g2], sorted=True) + self.assert_back_links(u3, [g3, g2, g1], sorted=True) + + +if "://" not in host: + if os.path.isfile(host): + host = "tdb://%s" % host + else: + host = "ldap://%s" % host + + +TestProgram(module=__name__, opts=subunitopts) diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 1b1394017a7..01cb87b1db3 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -583,6 +583,7 @@ plantestsuite_loadlist("samba4.ldap.sites.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [ plantestsuite_loadlist("samba4.ldap.sort.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/sort.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) plantestsuite_loadlist("samba4.ldap.vlv.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/vlv.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) +plantestsuite_loadlist("samba4.ldap.linked_attributes.python(ad_dc_ntvfs)", "ad_dc_ntvfs:local", [python, os.path.join(samba4srcdir, "dsdb/tests/python/linked_attributes.py"), '$PREFIX_ABS/ad_dc_ntvfs/private/sam.ldb', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) for env in ["ad_dc_ntvfs", "fl2000dc", "fl2003dc", "fl2008r2dc"]: plantestsuite_loadlist("samba4.ldap_schema.python(%s)" % env, env, [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap_schema.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) |