diff options
Diffstat (limited to 'python/samba/tests')
53 files changed, 7489 insertions, 0 deletions
diff --git a/python/samba/tests/__init__.py b/python/samba/tests/__init__.py new file mode 100644 index 00000000000..2df30a641bf --- /dev/null +++ b/python/samba/tests/__init__.py @@ -0,0 +1,237 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Samba Python tests.""" + +import os +import ldb +import samba +import samba.auth +from samba import param +from samba.samdb import SamDB +import subprocess +import tempfile + +samba.ensure_external_module("testtools", "testtools") + +# Other modules import these two classes from here, for convenience: +from testtools.testcase import ( + TestCase as TesttoolsTestCase, + TestSkipped, + ) + + +class TestCase(TesttoolsTestCase): + """A Samba test case.""" + + def setUp(self): + super(TestCase, self).setUp() + test_debug_level = os.getenv("TEST_DEBUG_LEVEL") + if test_debug_level is not None: + test_debug_level = int(test_debug_level) + self._old_debug_level = samba.get_debug_level() + samba.set_debug_level(test_debug_level) + self.addCleanup(samba.set_debug_level, test_debug_level) + + def get_loadparm(self): + return env_loadparm() + + def get_credentials(self): + return cmdline_credentials + + +class LdbTestCase(TesttoolsTestCase): + """Trivial test case for running tests against a LDB.""" + + def setUp(self): + super(LdbTestCase, self).setUp() + self.filename = os.tempnam() + self.ldb = samba.Ldb(self.filename) + + def set_modules(self, modules=[]): + """Change the modules for this Ldb.""" + m = ldb.Message() + m.dn = ldb.Dn(self.ldb, "@MODULES") + m["@LIST"] = ",".join(modules) + self.ldb.add(m) + self.ldb = samba.Ldb(self.filename) + + +class TestCaseInTempDir(TestCase): + + def setUp(self): + super(TestCaseInTempDir, self).setUp() + self.tempdir = tempfile.mkdtemp() + self.addCleanup(self._remove_tempdir) + + def _remove_tempdir(self): + self.assertEquals([], os.listdir(self.tempdir)) + os.rmdir(self.tempdir) + self.tempdir = None + + +def env_loadparm(): + lp = param.LoadParm() + try: + lp.load(os.environ["SMB_CONF_PATH"]) + except KeyError: + raise Exception("SMB_CONF_PATH not set") + return lp + + +def env_get_var_value(var_name): + """Returns value for variable in os.environ + + Function throws AssertionError if variable is defined. + Unit-test based python tests require certain input params + to be set in environment, otherwise they can't be run + """ + assert var_name in os.environ.keys(), "Please supply %s in environment" % var_name + return os.environ[var_name] + + +cmdline_credentials = None + +class RpcInterfaceTestCase(TestCase): + """DCE/RPC Test case.""" + + +class ValidNetbiosNameTests(TestCase): + + def test_valid(self): + self.assertTrue(samba.valid_netbios_name("FOO")) + + def test_too_long(self): + self.assertFalse(samba.valid_netbios_name("FOO"*10)) + + def test_invalid_characters(self): + self.assertFalse(samba.valid_netbios_name("*BLA")) + + +class BlackboxProcessError(Exception): + """This is raised when check_output() process returns a non-zero exit status + + Exception instance should contain the exact exit code (S.returncode), + command line (S.cmd), process output (S.stdout) and process error stream + (S.stderr) + """ + + def __init__(self, returncode, cmd, stdout, stderr): + self.returncode = returncode + self.cmd = cmd + self.stdout = stdout + self.stderr = stderr + + def __str__(self): + return "Command '%s'; exit status %d; stdout: '%s'; stderr: '%s'" % (self.cmd, self.returncode, + self.stdout, self.stderr) + +class BlackboxTestCase(TestCase): + """Base test case for blackbox tests.""" + + def _make_cmdline(self, line): + bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../../bin")) + parts = line.split(" ") + if os.path.exists(os.path.join(bindir, parts[0])): + parts[0] = os.path.join(bindir, parts[0]) + line = " ".join(parts) + return line + + def check_run(self, line): + line = self._make_cmdline(line) + p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + retcode = p.wait() + if retcode: + raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read()) + + def check_output(self, line): + line = self._make_cmdline(line) + p = subprocess.Popen(line, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True) + retcode = p.wait() + if retcode: + raise BlackboxProcessError(retcode, line, p.stdout.read(), p.stderr.read()) + return p.stdout.read() + +def connect_samdb(samdb_url, lp=None, session_info=None, credentials=None, + flags=0, ldb_options=None, ldap_only=False): + """Create SamDB instance and connects to samdb_url database. + + :param samdb_url: Url for database to connect to. + :param lp: Optional loadparm object + :param session_info: Optional session information + :param credentials: Optional credentials, defaults to anonymous. + :param flags: Optional LDB flags + :param ldap_only: If set, only remote LDAP connection will be created. + + Added value for tests is that we have a shorthand function + to make proper URL for ldb.connect() while using default + parameters for connection based on test environment + """ + samdb_url = samdb_url.lower() + if not "://" in samdb_url: + if not ldap_only and os.path.isfile(samdb_url): + samdb_url = "tdb://%s" % samdb_url + else: + samdb_url = "ldap://%s" % samdb_url + # use 'paged_search' module when connecting remotely + if samdb_url.startswith("ldap://"): + ldb_options = ["modules:paged_searches"] + elif ldap_only: + raise AssertionError("Trying to connect to %s while remote " + "connection is required" % samdb_url) + + # set defaults for test environment + if lp is None: + lp = env_loadparm() + if session_info is None: + session_info = samba.auth.system_session(lp) + if credentials is None: + credentials = cmdline_credentials + + return SamDB(url=samdb_url, + lp=lp, + session_info=session_info, + credentials=credentials, + flags=flags, + options=ldb_options) + + +def connect_samdb_ex(samdb_url, lp=None, session_info=None, credentials=None, + flags=0, ldb_options=None, ldap_only=False): + """Connects to samdb_url database + + :param samdb_url: Url for database to connect to. + :param lp: Optional loadparm object + :param session_info: Optional session information + :param credentials: Optional credentials, defaults to anonymous. + :param flags: Optional LDB flags + :param ldap_only: If set, only remote LDAP connection will be created. + :return: (sam_db_connection, rootDse_record) tuple + """ + sam_db = connect_samdb(samdb_url, lp, session_info, credentials, + flags, ldb_options, ldap_only) + # fetch RootDse + res = sam_db.search(base="", expression="", scope=ldb.SCOPE_BASE, + attrs=["*"]) + return (sam_db, res[0]) + + +def delete_force(samdb, dn): + try: + samdb.delete(dn) + except ldb.LdbError, (num, _): + assert(num == ldb.ERR_NO_SUCH_OBJECT) diff --git a/python/samba/tests/auth.py b/python/samba/tests/auth.py new file mode 100644 index 00000000000..f71e1a784d2 --- /dev/null +++ b/python/samba/tests/auth.py @@ -0,0 +1,31 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Auth Python bindings. + +Note that this just tests the bindings work. It does not intend to test +the functionality, that's already done in other tests. +""" + +from samba import auth +import samba.tests + +class AuthTests(samba.tests.TestCase): + + def test_system_session(self): + auth.system_session() + diff --git a/python/samba/tests/blackbox/__init__.py b/python/samba/tests/blackbox/__init__.py new file mode 100644 index 00000000000..361e5cfe5e5 --- /dev/null +++ b/python/samba/tests/blackbox/__init__.py @@ -0,0 +1,17 @@ +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Blackbox tests. """ diff --git a/python/samba/tests/blackbox/ndrdump.py b/python/samba/tests/blackbox/ndrdump.py new file mode 100644 index 00000000000..fca9a931533 --- /dev/null +++ b/python/samba/tests/blackbox/ndrdump.py @@ -0,0 +1,49 @@ +# Blackbox tests for ndrdump +# Copyright (C) 2008 Andrew Tridgell <tridge@samba.org> +# Copyright (C) 2008 Andrew Bartlett <abartlet@samba.org> +# Copyright (C) 2010 Jelmer Vernooij <jelmer@samba.org> +# based on test_smbclient.sh + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Blackbox tests for ndrdump.""" + +import os +from samba.tests import BlackboxTestCase + +for p in [ "../../../../../source4/librpc/tests", "../../../../../librpc/tests"]: + data_path_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), p)) + print data_path_dir + if os.path.exists(data_path_dir): + break + + +class NdrDumpTests(BlackboxTestCase): + """Blackbox tests for ndrdump.""" + + def data_path(self, name): + return os.path.join(data_path_dir, name) + + def test_ndrdump_with_in(self): + self.check_run("ndrdump samr samr_CreateUser in %s" % (self.data_path("samr-CreateUser-in.dat"))) + + def test_ndrdump_with_out(self): + self.check_run("ndrdump samr samr_CreateUser out %s" % (self.data_path("samr-CreateUser-out.dat"))) + + def test_ndrdump_context_file(self): + self.check_run("ndrdump --context-file %s samr samr_CreateUser out %s" % (self.data_path("samr-CreateUser-in.dat"), self.data_path("samr-CreateUser-out.dat"))) + + def test_ndrdump_with_validate(self): + self.check_run("ndrdump --validate samr samr_CreateUser in %s" % (self.data_path("samr-CreateUser-in.dat"))) diff --git a/python/samba/tests/blackbox/samba_tool_drs.py b/python/samba/tests/blackbox/samba_tool_drs.py new file mode 100644 index 00000000000..62d7bf123bb --- /dev/null +++ b/python/samba/tests/blackbox/samba_tool_drs.py @@ -0,0 +1,97 @@ +# Blackbox tests for "samba-tool drs" command +# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Blackbox tests for samba-tool drs.""" + +import samba.tests + + +class SambaToolDrsTests(samba.tests.BlackboxTestCase): + """Blackbox test case for samba-tool drs.""" + + def setUp(self): + super(SambaToolDrsTests, self).setUp() + + self.dc1 = samba.tests.env_get_var_value("DC1") + self.dc2 = samba.tests.env_get_var_value("DC2") + + creds = self.get_credentials() + self.cmdline_creds = "-U%s/%s%%%s" % (creds.get_domain(), + creds.get_username(), creds.get_password()) + + def _get_rootDSE(self, dc): + samdb = samba.tests.connect_samdb(dc, lp=self.get_loadparm(), + credentials=self.get_credentials(), + ldap_only=True) + return samdb.search(base="", scope=samba.tests.ldb.SCOPE_BASE)[0] + + def test_samba_tool_bind(self): + """Tests 'samba-tool drs bind' command + Output should be like: + Extensions supported: + <list-of-supported-extensions> + Site GUID: <GUID> + Repl epoch: 0""" + out = self.check_output("samba-tool drs bind %s %s" % (self.dc1, + self.cmdline_creds)) + self.assertTrue("Site GUID:" in out) + self.assertTrue("Repl epoch:" in out) + + def test_samba_tool_kcc(self): + """Tests 'samba-tool drs kcc' command + Output should be like 'Consistency check on <DC> successful.'""" + out = self.check_output("samba-tool drs kcc %s %s" % (self.dc1, + self.cmdline_creds)) + self.assertTrue("Consistency check on" in out) + self.assertTrue("successful" in out) + + def test_samba_tool_showrepl(self): + """Tests 'samba-tool drs showrepl' command + Output should be like: + <site-name>/<domain-name> + DSA Options: <hex-options> + DSA object GUID: <DSA-object-GUID> + DSA invocationId: <DSA-invocationId> + <Inbound-connections-list> + <Outbound-connections-list> + <KCC-objects> + ... + TODO: Perhaps we should check at least for + DSA's objectGUDI and invocationId""" + out = self.check_output("samba-tool drs showrepl %s %s" % (self.dc1, + self.cmdline_creds)) + self.assertTrue("DSA Options:" in out) + self.assertTrue("DSA object GUID:" in out) + self.assertTrue("DSA invocationId:" in out) + + def test_samba_tool_options(self): + """Tests 'samba-tool drs options' command + Output should be like 'Current DSA options: IS_GC <OTHER_FLAGS>'""" + out = self.check_output("samba-tool drs options %s %s" % (self.dc1, + self.cmdline_creds)) + self.assertTrue("Current DSA options:" in out) + + def test_samba_tool_replicate(self): + """Tests 'samba-tool drs replicate' command + Output should be like 'Replicate from <DC-SRC> to <DC-DEST> was successful.'""" + nc_name = self._get_rootDSE(self.dc1)["defaultNamingContext"] + out = self.check_output("samba-tool drs replicate %s %s %s %s" % (self.dc1, + self.dc2, + nc_name, + self.cmdline_creds)) + self.assertTrue("Replicate from" in out) + self.assertTrue("was successful" in out) diff --git a/python/samba/tests/common.py b/python/samba/tests/common.py new file mode 100644 index 00000000000..8794e9dc8b3 --- /dev/null +++ b/python/samba/tests/common.py @@ -0,0 +1,40 @@ +# Unix SMB/CIFS implementation. Tests for common.py routines +# Copyright (C) Andrew Tridgell 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.common""" + +import samba, os +import samba.tests +from samba.common import * +from samba.samdb import SamDB + + +class CommonTests(samba.tests.TestCase): + + def test_normalise_int32(self): + self.assertEquals('17', normalise_int32(17)) + self.assertEquals('17', normalise_int32('17')) + self.assertEquals('-123', normalise_int32('-123')) + self.assertEquals('-1294967296', normalise_int32('3000000000')) + + def test_dsdb_Dn(self): + sam = samba.Ldb(url='dntest.ldb') + dn1 = dsdb_Dn(sam, "DC=foo,DC=bar") + dn2 = dsdb_Dn(sam, "B:8:0000000D:<GUID=b3f0ec29-17f4-452a-b002-963e1909d101>;DC=samba,DC=example,DC=com") + self.assertEquals(dn2.binary, "0000000D") + self.assertEquals(13, dn2.get_binary_integer()) + os.unlink('dntest.ldb') diff --git a/python/samba/tests/core.py b/python/samba/tests/core.py new file mode 100644 index 00000000000..8206e68d4fb --- /dev/null +++ b/python/samba/tests/core.py @@ -0,0 +1,63 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Samba Python tests.""" + +import ldb +import os +import samba +from samba.tests import TestCase, TestCaseInTempDir + +class SubstituteVarTestCase(TestCase): + + def test_empty(self): + self.assertEquals("", samba.substitute_var("", {})) + + def test_nothing(self): + self.assertEquals("foo bar", + samba.substitute_var("foo bar", {"bar": "bla"})) + + def test_replace(self): + self.assertEquals("foo bla", + samba.substitute_var("foo ${bar}", {"bar": "bla"})) + + def test_broken(self): + self.assertEquals("foo ${bdkjfhsdkfh sdkfh ", + samba.substitute_var("foo ${bdkjfhsdkfh sdkfh ", {"bar": "bla"})) + + def test_unknown_var(self): + self.assertEquals("foo ${bla} gsff", + samba.substitute_var("foo ${bla} gsff", {"bar": "bla"})) + + def test_check_all_substituted(self): + samba.check_all_substituted("nothing to see here") + self.assertRaises(Exception, samba.check_all_substituted, + "Not subsituted: ${FOOBAR}") + + +class LdbExtensionTests(TestCaseInTempDir): + + def test_searchone(self): + path = self.tempdir + "/searchone.ldb" + l = samba.Ldb(path) + try: + l.add({"dn": "foo=dc", "bar": "bla"}) + self.assertEquals("bla", + l.searchone(basedn=ldb.Dn(l, "foo=dc"), attribute="bar")) + finally: + del l + os.unlink(path) diff --git a/python/samba/tests/credentials.py b/python/samba/tests/credentials.py new file mode 100644 index 00000000000..95ee0fa0deb --- /dev/null +++ b/python/samba/tests/credentials.py @@ -0,0 +1,98 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Credentials Python bindings. + +Note that this just tests the bindings work. It does not intend to test +the functionality, that's already done in other tests. +""" + +from samba import credentials +import samba.tests + +class CredentialsTests(samba.tests.TestCase): + + def setUp(self): + super(CredentialsTests, self).setUp() + self.creds = credentials.Credentials() + + def test_set_username(self): + self.creds.set_username("somebody") + self.assertEquals("somebody", self.creds.get_username()) + + def test_set_password(self): + self.creds.set_password("S3CreT") + self.assertEquals("S3CreT", self.creds.get_password()) + + def test_set_domain(self): + self.creds.set_domain("ABMAS") + self.assertEquals("ABMAS", self.creds.get_domain()) + + def test_set_realm(self): + self.creds.set_realm("myrealm") + self.assertEquals("MYREALM", self.creds.get_realm()) + + def test_parse_string_anon(self): + self.creds.parse_string("%") + self.assertEquals("", self.creds.get_username()) + self.assertEquals(None, self.creds.get_password()) + + def test_parse_string_user_pw_domain(self): + self.creds.parse_string("dom\\someone%secr") + self.assertEquals("someone", self.creds.get_username()) + self.assertEquals("secr", self.creds.get_password()) + self.assertEquals("DOM", self.creds.get_domain()) + + def test_bind_dn(self): + self.assertEquals(None, self.creds.get_bind_dn()) + self.creds.set_bind_dn("dc=foo,cn=bar") + self.assertEquals("dc=foo,cn=bar", self.creds.get_bind_dn()) + + def test_is_anon(self): + self.creds.set_username("") + self.assertTrue(self.creds.is_anonymous()) + self.creds.set_username("somebody") + self.assertFalse(self.creds.is_anonymous()) + self.creds.set_anonymous() + self.assertTrue(self.creds.is_anonymous()) + + def test_workstation(self): + # FIXME: This is uninitialised, it should be None + #self.assertEquals(None, self.creds.get_workstation()) + self.creds.set_workstation("myworksta") + self.assertEquals("myworksta", self.creds.get_workstation()) + + def test_get_nt_hash(self): + self.creds.set_password("geheim") + self.assertEquals('\xc2\xae\x1f\xe6\xe6H\x84cRE>\x81o*\xeb\x93', + self.creds.get_nt_hash()) + + def test_guess(self): + # Just check the method is there and doesn't raise an exception + self.creds.guess() + + def test_set_cmdline_callbacks(self): + self.creds.set_cmdline_callbacks() + + def test_authentication_requested(self): + self.creds.set_username("") + self.assertFalse(self.creds.authentication_requested()) + self.creds.set_username("somebody") + self.assertTrue(self.creds.authentication_requested()) + + def test_wrong_password(self): + self.assertFalse(self.creds.wrong_password()) diff --git a/python/samba/tests/dcerpc/__init__.py b/python/samba/tests/dcerpc/__init__.py new file mode 100644 index 00000000000..d84cb57a096 --- /dev/null +++ b/python/samba/tests/dcerpc/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Tests for the DCE/RPC Python bindings.""" + diff --git a/python/samba/tests/dcerpc/bare.py b/python/samba/tests/dcerpc/bare.py new file mode 100644 index 00000000000..3efbf9d4cf3 --- /dev/null +++ b/python/samba/tests/dcerpc/bare.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.tests.dcerpc.bare.""" + +from samba.dcerpc import ClientConnection +import samba.tests + +class BareTestCase(samba.tests.TestCase): + + def test_bare(self): + # Connect to the echo pipe + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + lp_ctx=samba.tests.env_loadparm()) + self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4)) + + def test_alter_context(self): + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("12345778-1234-abcd-ef00-0123456789ac", 1), + lp_ctx=samba.tests.env_loadparm()) + y = ClientConnection("ncalrpc:localhost", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + basis_connection=x, lp_ctx=samba.tests.env_loadparm()) + x.alter_context(("60a15ec5-4de8-11d7-a637-005056a20182", 1)) + # FIXME: self.assertEquals("\x01\x00\x00\x00", x.request(0, chr(0) * 4)) + + def test_two_connections(self): + x = ClientConnection("ncalrpc:localhost[DEFAULT]", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + lp_ctx=samba.tests.env_loadparm()) + y = ClientConnection("ncalrpc:localhost", + ("60a15ec5-4de8-11d7-a637-005056a20182", 1), + basis_connection=x, lp_ctx=samba.tests.env_loadparm()) + self.assertEquals("\x01\x00\x00\x00", y.request(0, chr(0) * 4)) diff --git a/python/samba/tests/dcerpc/dnsserver.py b/python/samba/tests/dcerpc/dnsserver.py new file mode 100644 index 00000000000..59d6eee7618 --- /dev/null +++ b/python/samba/tests/dcerpc/dnsserver.py @@ -0,0 +1,241 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.dnsserver""" + +from samba.dcerpc import dnsp, dnsserver +from samba.tests import RpcInterfaceTestCase, env_get_var_value +from samba.netcmd.dns import ARecord + +class DnsserverTests(RpcInterfaceTestCase): + + def setUp(self): + super(DnsserverTests, self).setUp() + self.server = env_get_var_value("SERVER_IP") + self.zone = env_get_var_value("REALM").lower() + self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server), + self.get_loadparm(), + self.get_credentials()) + + def test_operation2(self): + pass + + + def test_query2(self): + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid) + + typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, + self.server, + None, + 'ServerInfo') + self.assertEquals(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid) + + def test_operation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + rev_zone = '1.168.192.in-addr.arpa' + + zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() + zone_create.pszZoneName = rev_zone + zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY + zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE + zone_create.fAging = 0 + zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT + + # Create zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + None, + 0, + 'ZoneCreate', + dnsserver.DNSSRV_TYPEID_ZONE_CREATE, + zone_create) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(1, zones.dwZoneCount) + + # Delete zone + self.conn.DnssrvOperation2(client_version, + 0, + self.server, + rev_zone, + 0, + 'DeleteZoneFromDs', + dnsserver.DNSSRV_TYPEID_NULL, + None) + + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(0, zones.dwZoneCount) + + + def test_complexoperation2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEquals(2, zones.dwZoneCount) + + request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE | + dnsserver.DNS_ZONE_REQUEST_PRIMARY) + typeid, zones = self.conn.DnssrvComplexOperation2(client_version, + 0, + self.server, + None, + 'EnumZones', + dnsserver.DNSSRV_TYPEID_DWORD, + request_filter) + self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid) + self.assertEquals(0, zones.dwZoneCount) + + + def test_enumrecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_NS + select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA | + dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA) + buflen, roothints = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + '..RootHints', + '.', + None, + record_type, + select_flags, + None, + None) + self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m) + + + def test_updaterecords2(self): + client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN + record_type = dnsp.DNS_TYPE_A + select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA + + name = 'dummy' + rec = ARecord('1.2.3.4') + rec2 = ARecord('5.6.7.8') + + # Add record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + None) + + buflen, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEquals(1, result.count) + self.assertEquals(1, result.rec[0].wRecordCount) + self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEquals('1.2.3.4', result.rec[0].records[0].data) + + # Update record + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec2 + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + add_rec_buf, + del_rec_buf) + + buflen, result = self.conn.DnssrvEnumRecords2(client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) + self.assertEquals(1, result.count) + self.assertEquals(1, result.rec[0].wRecordCount) + self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType) + self.assertEquals('5.6.7.8', result.rec[0].records[0].data) + + # Delete record + del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + del_rec_buf.rec = rec2 + self.conn.DnssrvUpdateRecord2(client_version, + 0, + self.server, + self.zone, + name, + None, + del_rec_buf) + + self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2, + client_version, + 0, + self.server, + self.zone, + name, + None, + record_type, + select_flags, + None, + None) diff --git a/python/samba/tests/dcerpc/misc.py b/python/samba/tests/dcerpc/misc.py new file mode 100644 index 00000000000..11e14aadfa7 --- /dev/null +++ b/python/samba/tests/dcerpc/misc.py @@ -0,0 +1,62 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.misc.""" + +from samba.dcerpc import misc +import samba.tests + +text1 = "76f53846-a7c2-476a-ae2c-20e2b80d7b34" +text2 = "344edffa-330a-4b39-b96e-2c34da52e8b1" + +class GUIDTests(samba.tests.TestCase): + + def test_str(self): + guid = misc.GUID(text1) + self.assertEquals(text1, str(guid)) + + def test_repr(self): + guid = misc.GUID(text1) + self.assertEquals("GUID('%s')" % text1, repr(guid)) + + def test_compare_different(self): + guid1 = misc.GUID(text1) + guid2 = misc.GUID(text2) + self.assertTrue(cmp(guid1, guid2) > 0) + + def test_compare_same(self): + guid1 = misc.GUID(text1) + guid2 = misc.GUID(text1) + self.assertEquals(0, cmp(guid1, guid2)) + self.assertEquals(guid1, guid2) + + +class PolicyHandleTests(samba.tests.TestCase): + + def test_init(self): + x = misc.policy_handle(text1, 1) + self.assertEquals(1, x.handle_type) + self.assertEquals(text1, str(x.uuid)) + + def test_repr(self): + x = misc.policy_handle(text1, 42) + self.assertEquals("policy_handle(%d, '%s')" % (42, text1), repr(x)) + + def test_str(self): + x = misc.policy_handle(text1, 42) + self.assertEquals("%d, %s" % (42, text1), str(x)) + diff --git a/python/samba/tests/dcerpc/registry.py b/python/samba/tests/dcerpc/registry.py new file mode 100644 index 00000000000..c7bcbfd530a --- /dev/null +++ b/python/samba/tests/dcerpc/registry.py @@ -0,0 +1,51 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.registry.""" + +from samba.dcerpc import winreg +from samba.tests import RpcInterfaceTestCase + + +class WinregTests(RpcInterfaceTestCase): + + def setUp(self): + super(WinregTests, self).setUp() + self.conn = winreg.winreg("ncalrpc:", self.get_loadparm(), + self.get_credentials()) + + def get_hklm(self): + return self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + + def test_hklm(self): + handle = self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + self.conn.CloseKey(handle) + + def test_getversion(self): + handle = self.get_hklm() + version = self.conn.GetVersion(handle) + self.assertEquals(int, version.__class__) + self.conn.CloseKey(handle) + + def test_getkeyinfo(self): + handle = self.conn.OpenHKLM(None, + winreg.KEY_QUERY_VALUE | winreg.KEY_ENUMERATE_SUB_KEYS) + x = self.conn.QueryInfoKey(handle, winreg.String()) + self.assertEquals(9, len(x)) # should return a 9-tuple + self.conn.CloseKey(handle) diff --git a/python/samba/tests/dcerpc/rpc_talloc.py b/python/samba/tests/dcerpc/rpc_talloc.py new file mode 100644 index 00000000000..c091f26c1b9 --- /dev/null +++ b/python/samba/tests/dcerpc/rpc_talloc.py @@ -0,0 +1,84 @@ +# test generated python code from pidl +# Copyright (C) Andrew Tridgell August 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# +# to run this test, use one of these: +# +# python -m testtools.run samba.tests.dcerpc.rpc_talloc +# +# or if you have trial installed (from twisted), use +# +# trial samba.tests.dcerpc.rpc_talloc + +"""Tests for the talloc handling in the generated Python DCE/RPC bindings.""" + +import sys + +sys.path.insert(0, "bin/python") + +import samba +import samba.tests +from samba.dcerpc import drsuapi +import talloc + +talloc.enable_null_tracking() + + +class TallocTests(samba.tests.TestCase): + '''test talloc behaviour of pidl generated python code''' + + def check_blocks(self, object, num_expected): + '''check that the number of allocated blocks is correct''' + nblocks = talloc.total_blocks(object) + if object is None: + nblocks -= self.initial_blocks + self.assertEquals(nblocks, num_expected) + + def get_rodc_partial_attribute_set(self): + '''get a list of attributes for RODC replication''' + partial_attribute_set = drsuapi.DsPartialAttributeSet() + + # we expect one block for the object, and one for the structure + self.check_blocks(partial_attribute_set, 2) + + attids = [1, 2, 3] + partial_attribute_set.version = 1 + partial_attribute_set.attids = attids + partial_attribute_set.num_attids = len(attids) + + # we expect one block object, a structure, an ARRAY, and a + # reference to the array + self.check_blocks(partial_attribute_set, 3) + + return partial_attribute_set + + def pas_test(self): + pas = self.get_rodc_partial_attribute_set() + self.check_blocks(pas, 3) + req8 = drsuapi.DsGetNCChangesRequest8() + self.check_blocks(req8, 2) + self.check_blocks(None, 5) + req8.partial_attribute_set = pas + if req8.partial_attribute_set.attids[1] != 2: + raise Exception("Wrong value in attids[2]") + # we now get an additional reference + self.check_blocks(None, 6) + + def test_run(self): + self.initial_blocks = talloc.total_blocks(None) + self.check_blocks(None, 0) + self.pas_test() + self.check_blocks(None, 0) diff --git a/python/samba/tests/dcerpc/rpcecho.py b/python/samba/tests/dcerpc/rpcecho.py new file mode 100644 index 00000000000..099f8f619ce --- /dev/null +++ b/python/samba/tests/dcerpc/rpcecho.py @@ -0,0 +1,71 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dceprc.rpcecho.""" + +from samba.dcerpc import echo +from samba.ndr import ndr_pack, ndr_unpack +from samba.tests import RpcInterfaceTestCase, TestCase + + +class RpcEchoTests(RpcInterfaceTestCase): + + def setUp(self): + super(RpcEchoTests, self).setUp() + self.conn = echo.rpcecho("ncalrpc:", self.get_loadparm()) + + def test_two_contexts(self): + self.conn2 = echo.rpcecho("ncalrpc:", self.get_loadparm(), basis_connection=self.conn) + self.assertEquals(3, self.conn2.AddOne(2)) + + def test_abstract_syntax(self): + self.assertEquals(("60a15ec5-4de8-11d7-a637-005056a20182", 1), + self.conn.abstract_syntax) + + def test_addone(self): + self.assertEquals(2, self.conn.AddOne(1)) + + def test_echodata(self): + self.assertEquals([1,2,3], self.conn.EchoData([1, 2, 3])) + + def test_call(self): + self.assertEquals(u"foobar", self.conn.TestCall(u"foobar")) + + def test_surrounding(self): + surrounding_struct = echo.Surrounding() + surrounding_struct.x = 4 + surrounding_struct.surrounding = [1,2,3,4] + y = self.conn.TestSurrounding(surrounding_struct) + self.assertEquals(8 * [0], y.surrounding) + + def test_manual_request(self): + self.assertEquals("\x01\x00\x00\x00", self.conn.request(0, chr(0) * 4)) + + def test_server_name(self): + self.assertEquals(None, self.conn.server_name) + + +class NdrEchoTests(TestCase): + + def test_info1_push(self): + x = echo.info1() + x.v = 42 + self.assertEquals("\x2a", ndr_pack(x)) + + def test_info1_pull(self): + x = ndr_unpack(echo.info1, "\x42") + self.assertEquals(x.v, 66) diff --git a/python/samba/tests/dcerpc/sam.py b/python/samba/tests/dcerpc/sam.py new file mode 100644 index 00000000000..0e09323adbe --- /dev/null +++ b/python/samba/tests/dcerpc/sam.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.sam.""" + +from samba.dcerpc import samr, security +from samba.tests import RpcInterfaceTestCase + +# FIXME: Pidl should be doing this for us +def toArray((handle, array, num_entries)): + ret = [] + for x in range(num_entries): + ret.append((array.entries[x].idx, array.entries[x].name)) + return ret + + +class SamrTests(RpcInterfaceTestCase): + + def setUp(self): + super(SamrTests, self).setUp() + self.conn = samr.samr("ncalrpc:", self.get_loadparm()) + + def test_connect5(self): + (level, info, handle) = self.conn.Connect5(None, 0, 1, samr.ConnectInfo1()) + + def test_connect2(self): + handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + self.assertTrue(handle is not None) + + def test_EnumDomains(self): + handle = self.conn.Connect2(None, security.SEC_FLAG_MAXIMUM_ALLOWED) + domains = toArray(self.conn.EnumDomains(handle, 0, -1)) + self.conn.Close(handle) + diff --git a/python/samba/tests/dcerpc/srvsvc.py b/python/samba/tests/dcerpc/srvsvc.py new file mode 100644 index 00000000000..3206a27e678 --- /dev/null +++ b/python/samba/tests/dcerpc/srvsvc.py @@ -0,0 +1,68 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Dhananjay Sathe <dhanajaysathe@gmail.com> 2011 +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.srvsvc.""" + +from samba.dcerpc import srvsvc +from samba.tests import RpcInterfaceTestCase + + +class SrvsvcTests(RpcInterfaceTestCase): + + def setUp(self): + super(SrvsvcTests, self).setUp() + self.conn = srvsvc.srvsvc("ncalrpc:", self.get_loadparm()) + self.server_unc = "\\\\." + + def getDummyShareObject(self): + share = srvsvc.NetShareInfo2() + + share.name = u'test' + share.comment = u'test share' + share.type = srvsvc.STYPE_DISKTREE + share.current_users = 0x00000000 + share.max_users = -1 + share.password = None + share.path = u'C:\\tmp' # some random path + share.permissions = 123434566 + return share + + def test_NetShareAdd(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + self.conn.NetShareAdd(self.server_unc, 2, share, None) + + def test_NetShareSetInfo(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + parm_error = 0x00000000 + self.conn.NetShareAdd(self.server_unc, 502, share, parm_error) + name = share.name + share.comment = "now sucessfully modified " + parm_error = self.pipe.NetShareSetInfo(self.server_unc, name, + 502, share, parm_error) + + def test_NetShareDel(self): + self.skip("Dangerous test") + share = self.getDummyShareObject() + parm_error = 0x00000000 + self.expectFailure("NetShareAdd doesn't work properly from Python", + self.conn.NetShareAdd, self.server_unc, 502, share, parm_error) + self.conn.NetShareDel(self.server_unc, share.name, 0) diff --git a/python/samba/tests/dcerpc/testrpc.py b/python/samba/tests/dcerpc/testrpc.py new file mode 100644 index 00000000000..e35d6b55446 --- /dev/null +++ b/python/samba/tests/dcerpc/testrpc.py @@ -0,0 +1,141 @@ +# test generated python code from pidl +# Copyright (C) Andrew Tridgell August 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +import sys + +sys.path.insert(0, "bin/python") + +import samba +import samba.tests +from samba.dcerpc import drsuapi +import talloc + +talloc.enable_null_tracking() + +class RpcTests(object): + '''test type behaviour of pidl generated python RPC code''' + + def check_blocks(self, object, num_expected): + '''check that the number of allocated blocks is correct''' + nblocks = talloc.total_blocks(object) + if object is None: + nblocks -= self.initial_blocks + leaked_blocks = (nblocks - num_expected) + if leaked_blocks != 0: + print "Leaked %d blocks" % leaked_blocks + + def check_type(self, interface, typename, type): + print "Checking type %s" % typename + v = type() + for n in dir(v): + if n[0] == '_': + continue + try: + value = getattr(v, n) + except TypeError, errstr: + if str(errstr) == "unknown union level": + print "ERROR: Unknown union level in %s.%s" % (typename, n) + self.errcount += 1 + continue + print str(errstr)[1:21] + if str(errstr)[0:21] == "Can not convert C Type": + print "ERROR: Unknown C type for %s.%s" % (typename, n) + self.errcount += 1 + continue + else: + print "ERROR: Failed to instantiate %s.%s" % (typename, n) + self.errcount += 1 + continue + except Exception: + print "ERROR: Failed to instantiate %s.%s" % (typename, n) + self.errcount += 1 + continue + + # now try setting the value back + try: + print "Setting %s.%s" % (typename, n) + setattr(v, n, value) + except Exception, e: + if isinstance(e, AttributeError) and str(e).endswith("is read-only"): + # readonly, ignore + continue + else: + print "ERROR: Failed to set %s.%s: %r: %s" % (typename, n, e.__class__, e) + self.errcount += 1 + continue + + # and try a comparison + try: + if value != getattr(v, n): + print "ERROR: Comparison failed for %s.%s: %r != %r" % (typename, n, value, getattr(v, n)) + continue + except Exception, e: + print "ERROR: compare exception for %s.%s: %r: %s" % (typename, n, e.__class__, e) + continue + + def check_interface(self, interface, iname): + errcount = self.errcount + for n in dir(interface): + if n[0] == '_' or n == iname: + # skip the special ones + continue + value = getattr(interface, n) + if isinstance(value, str): + #print "%s=\"%s\"" % (n, value) + pass + elif isinstance(value, int) or isinstance(value, long): + #print "%s=%d" % (n, value) + pass + elif isinstance(value, type): + try: + initial_blocks = talloc.total_blocks(None) + self.check_type(interface, n, value) + self.check_blocks(None, initial_blocks) + except Exception, e: + print "ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e) + self.errcount += 1 + elif callable(value): + pass # Method + else: + print "UNKNOWN: %s=%s" % (n, value) + if self.errcount - errcount != 0: + print "Found %d errors in %s" % (self.errcount - errcount, iname) + + def check_all_interfaces(self): + for iname in dir(samba.dcerpc): + if iname[0] == '_': + continue + if iname == 'ClientConnection' or iname == 'base': + continue + print "Checking interface %s" % iname + iface = getattr(samba.dcerpc, iname) + initial_blocks = talloc.total_blocks(None) + self.check_interface(iface, iname) + self.check_blocks(None, initial_blocks) + + def run(self): + self.initial_blocks = talloc.total_blocks(None) + self.errcount = 0 + self.check_all_interfaces() + return self.errcount + +tests = RpcTests() +errcount = tests.run() +if errcount == 0: + sys.exit(0) +else: + print "%d failures" % errcount + sys.exit(1) diff --git a/python/samba/tests/dcerpc/unix.py b/python/samba/tests/dcerpc/unix.py new file mode 100644 index 00000000000..e8ef4da8630 --- /dev/null +++ b/python/samba/tests/dcerpc/unix.py @@ -0,0 +1,49 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.unixinfo.""" + + +from samba.dcerpc import unixinfo +from samba.tests import RpcInterfaceTestCase + +class UnixinfoTests(RpcInterfaceTestCase): + + def setUp(self): + super(UnixinfoTests, self).setUp() + self.conn = unixinfo.unixinfo("ncalrpc:", self.get_loadparm()) + + def test_getpwuid_int(self): + infos = self.conn.GetPWUid(range(512)) + self.assertEquals(512, len(infos)) + self.assertEquals("/bin/false", infos[0].shell) + self.assertTrue(isinstance(infos[0].homedir, unicode)) + + def test_getpwuid(self): + infos = self.conn.GetPWUid(map(long, range(512))) + self.assertEquals(512, len(infos)) + self.assertEquals("/bin/false", infos[0].shell) + self.assertTrue(isinstance(infos[0].homedir, unicode)) + + def test_gidtosid(self): + self.conn.GidToSid(1000L) + + def test_uidtosid(self): + self.conn.UidToSid(1000) + + def test_uidtosid_fail(self): + self.assertRaises(TypeError, self.conn.UidToSid, "100") diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py new file mode 100644 index 00000000000..49d699edb78 --- /dev/null +++ b/python/samba/tests/dns.py @@ -0,0 +1,622 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Kai Blin <kai@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import struct +import random +from samba import socket +import samba.ndr as ndr +import samba.dcerpc.dns as dns +from samba.tests import TestCase + +class DNSTest(TestCase): + + def errstr(self, errcode): + "Return a readable error code" + string_codes = [ + "OK", + "FORMERR", + "SERVFAIL", + "NXDOMAIN", + "NOTIMP", + "REFUSED", + "YXDOMAIN", + "YXRRSET", + "NXRRSET", + "NOTAUTH", + "NOTZONE", + ] + + return string_codes[errcode] + + + def assert_dns_rcode_equals(self, packet, rcode): + "Helper function to check return code" + p_errcode = packet.operation & 0x000F + self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % + (self.errstr(rcode), self.errstr(p_errcode))) + + def assert_dns_opcode_equals(self, packet, opcode): + "Helper function to check opcode" + p_opcode = packet.operation & 0x7800 + self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % + (opcode, p_opcode)) + + def make_name_packet(self, opcode, qid=None): + "Helper creating a dns.name_packet" + p = dns.name_packet() + if qid is None: + p.id = random.randint(0x0, 0xffff) + p.operation = opcode + p.questions = [] + return p + + def finish_name_packet(self, packet, questions): + "Helper to finalize a dns.name_packet" + packet.qdcount = len(questions) + packet.questions = questions + + def make_name_question(self, name, qtype, qclass): + "Helper creating a dns.name_question" + q = dns.name_question() + q.name = name + q.question_type = qtype + q.question_class = qclass + return q + + def get_dns_domain(self): + "Helper to get dns domain" + return os.getenv('REALM', 'example.com').lower() + + def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')): + "send a DNS query and read the reply" + s = None + try: + send_packet = ndr.ndr_pack(packet) + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + s.connect((host, 53)) + s.send(send_packet, 0) + recv_packet = s.recv(2048, 0) + return ndr.ndr_unpack(dns.name_packet, recv_packet) + finally: + if s is not None: + s.close() + + def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP')): + "send a DNS query and read the reply" + s = None + try: + send_packet = ndr.ndr_pack(packet) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + s.connect((host, 53)) + tcp_packet = struct.pack('!H', len(send_packet)) + tcp_packet += send_packet + s.send(tcp_packet, 0) + recv_packet = s.recv(0xffff + 2, 0) + return ndr.ndr_unpack(dns.name_packet, recv_packet[2:]) + finally: + if s is not None: + s.close() + + +class TestSimpleQueries(DNSTest): + + def test_one_a_query(self): + "create a query packet containing one query record" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print "asking for ", q.name + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rdata, + os.getenv('SERVER_IP')) + + def test_one_a_query_tcp(self): + "create a query packet containing one query record via TCP" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print "asking for ", q.name + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_tcp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rdata, + os.getenv('SERVER_IP')) + + def test_two_queries(self): + "create a query packet containing two query records" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + questions.append(q) + + name = "%s.%s" % ('bogusname', self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + + def test_qtype_all_query(self): + "create a QTYPE_ALL query" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN) + print "asking for ", q.name + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + + num_answers = 1 + dc_ipv6 = os.getenv('SERVER_IPV6') + if dc_ipv6 is not None: + num_answers += 1 + + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, num_answers) + self.assertEquals(response.answers[0].rdata, + os.getenv('SERVER_IP')) + if dc_ipv6 is not None: + self.assertEquals(response.answers[1].rdata, dc_ipv6) + + def test_qclass_none_query(self): + "create a QCLASS_NONE query" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) + +# Only returns an authority section entry in BIND and Win DNS +# FIXME: Enable one Samba implements this feature +# def test_soa_hostname_query(self): +# "create a SOA query for a hostname" +# p = self.make_name_packet(dns.DNS_OPCODE_QUERY) +# questions = [] +# +# name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) +# q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) +# questions.append(q) +# +# self.finish_name_packet(p, questions) +# response = self.dns_transaction_udp(p) +# self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) +# self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) +# # We don't get SOA records for single hosts +# self.assertEquals(response.ancount, 0) + + def test_soa_domain_query(self): + "create a SOA query for a domain" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + + +class TestDNSUpdates(DNSTest): + + def test_two_updates(self): + "create two update requests" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + updates.append(u) + + name = self.get_dns_domain() + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + updates.append(u) + + self.finish_name_packet(p, updates) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + + def test_update_wrong_qclass(self): + "create update with DNS_QCLASS_NONE" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE) + updates.append(u) + + self.finish_name_packet(p, updates) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP) + + def test_update_prereq_with_non_null_ttl(self): + "test update with a non-null TTL" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + prereqs = [] + r = dns.res_rec() + r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 1 + r.length = 0 + prereqs.append(r) + + p.ancount = len(prereqs) + p.answers = prereqs + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + +# I'd love to test this one, but it segfaults. :) +# def test_update_prereq_with_non_null_length(self): +# "test update with a non-null length" +# p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) +# updates = [] +# +# name = self.get_dns_domain() +# +# u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) +# updates.append(u) +# self.finish_name_packet(p, updates) +# +# prereqs = [] +# r = dns.res_rec() +# r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) +# r.rr_type = dns.DNS_QTYPE_TXT +# r.rr_class = dns.DNS_QCLASS_ANY +# r.ttl = 0 +# r.length = 1 +# prereqs.append(r) +# +# p.ancount = len(prereqs) +# p.answers = prereqs +# +# response = self.dns_transaction_udp(p) +# self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR) + + def test_update_prereq_nonexisting_name(self): + "test update with a nonexisting name" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + prereqs = [] + r = dns.res_rec() + r.name = "idontexist.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_ANY + r.ttl = 0 + r.length = 0 + prereqs.append(r) + + p.ancount = len(prereqs) + p.answers = prereqs + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET) + + def test_update_add_txt_record(self): + "test adding records works" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "textrec.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + r.rdata = dns.txt_record() + r.rdata.txt = '"This is a test"' + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "textrec.%s" % self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rdata.txt, '"This is a test"') + + def test_update_add_two_txt_records(self): + "test adding two txt records works" + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "textrec2.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + r.rdata = dns.txt_record() + r.rdata.txt = '"This is a test" "and this is a test, too"' + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "textrec2.%s" % self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"') + + def test_delete_record(self): + "Test if deleting records works" + + NAME = "deleterec.%s" % self.get_dns_domain() + + # First, create a record to make sure we have a record to delete. + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + r.rdata = dns.txt_record() + r.rdata.txt = '"This is a test"' + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now check the record is around + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # Now delete the record + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = NAME + r.rr_type = dns.DNS_QTYPE_TXT + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + r.rdata = dns.txt_record() + r.rdata.txt = '"This is a test"' + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + # And finally check it's gone + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN) + + +class TestComplexQueries(DNSTest): + + def setUp(self): + super(TestComplexQueries, self).setUp() + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "cname_test.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_CNAME + r.rr_class = dns.DNS_QCLASS_IN + r.ttl = 900 + r.length = 0xffff + r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + def tearDown(self): + super(TestComplexQueries, self).tearDown() + p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) + updates = [] + + name = self.get_dns_domain() + + u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) + updates.append(u) + self.finish_name_packet(p, updates) + + updates = [] + r = dns.res_rec() + r.name = "cname_test.%s" % self.get_dns_domain() + r.rr_type = dns.DNS_QTYPE_CNAME + r.rr_class = dns.DNS_QCLASS_NONE + r.ttl = 0 + r.length = 0xffff + r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + updates.append(r) + p.nscount = len(updates) + p.nsrecs = updates + + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + + def test_one_a_query(self): + "create a query packet containing one query record" + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "cname_test.%s" % self.get_dns_domain() + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print "asking for ", q.name + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 2) + self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME) + self.assertEquals(response.answers[0].rdata, "%s.%s" % + (os.getenv('SERVER'), self.get_dns_domain())) + self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A) + self.assertEquals(response.answers[1].rdata, + os.getenv('SERVER_IP')) + +class TestInvalidQueries(DNSTest): + + def test_one_a_query(self): + "send 0 bytes follows by create a query packet containing one query record" + + s = None + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + s.connect((os.getenv('SERVER_IP'), 53)) + s.send("", 0) + finally: + if s is not None: + s.close() + + p = self.make_name_packet(dns.DNS_OPCODE_QUERY) + questions = [] + + name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain()) + q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN) + print "asking for ", q.name + questions.append(q) + + self.finish_name_packet(p, questions) + response = self.dns_transaction_udp(p) + self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) + self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY) + self.assertEquals(response.ancount, 1) + self.assertEquals(response.answers[0].rdata, + os.getenv('SERVER_IP')) + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/python/samba/tests/docs.py b/python/samba/tests/docs.py new file mode 100644 index 00000000000..c1b371680d7 --- /dev/null +++ b/python/samba/tests/docs.py @@ -0,0 +1,127 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2012 +# +# Tests for documentation. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for presence of documentation.""" + +import samba +import samba.tests +from samba.tests import TestSkipped + +import errno +import os +import re +import subprocess + + +class TestCase(samba.tests.TestCase): + + def _format_message(self, parameters, message): + parameters = list(parameters) + parameters.sort() + return message + '\n\n %s' % ('\n '.join(parameters)) + + +class NoXsltProc(Exception): + + def __init__(self): + Exception.__init__(self, "'xsltproc' is not installed") + + +def get_documented_parameters(sourcedir): + path = os.path.join(sourcedir, "bin", "default", "docs-xml", "smbdotconf") + if not os.path.exists(os.path.join(path, "parameters.all.xml")): + raise Exception("Unable to find parameters.all.xml") + try: + p = subprocess.Popen( + ["xsltproc", "--xinclude", "--param", "smb.context", "ALL", os.path.join(sourcedir, "docs-xml", "smbdotconf", "generate-context.xsl"), "parameters.all.xml"], + stderr=subprocess.STDOUT, stdout=subprocess.PIPE, + cwd=path) + except OSError, e: + if e.errno == errno.ENOENT: + raise NoXsltProc() + raise + out, err = p.communicate() + assert p.returncode == 0, "returncode was %r" % p.returncode + for l in out.splitlines(): + m = re.match('<samba:parameter .*?name="([^"]*?)"', l) + if "removed=\"1\"" in l: + continue + if m: + name = m.group(1) + yield name + m = re.match('.*<synonym>(.*)</synonym>.*', l) + if m: + name = m.group(1) + yield name + + +def get_implementation_parameters(sourcedir): + # Reading entries from source code + f = open(os.path.join(sourcedir, "lib/param/param_table.c"), "r") + try: + # burn through the preceding lines + while True: + l = f.readline() + if l.startswith("static struct parm_struct parm_table"): + break + + for l in f.readlines(): + if re.match("^\s*\}\;\s*$", l): + break + # pull in the param names only + if re.match(".*P_SEPARATOR.*", l): + continue + m = re.match("\s*\.label\s*=\s*\"(.*)\".*", l) + if not m: + continue + + name = m.group(1) + yield name + finally: + f.close() + + +class SmbDotConfTests(TestCase): + + def test_unknown(self): + topdir = samba.source_tree_topdir() + try: + documented = set(get_documented_parameters(topdir)) + except NoXsltProc: + raise TestSkipped("'xsltproc' is missing, unable to load parameters") + parameters = set(get_implementation_parameters(topdir)) + # Filter out parametric options, since we can't find them in the parm + # table + documented = set([p for p in documented if not ":" in p]) + unknown = documented.difference(parameters) + if len(unknown) > 0: + self.fail(self._format_message(unknown, + "Parameters that are documented but not in the implementation:")) + + def test_undocumented(self): + topdir = samba.source_tree_topdir() + try: + documented = set(get_documented_parameters(topdir)) + except NoXsltProc: + raise TestSkipped("'xsltproc' is missing, unable to load parameters") + parameters = set(get_implementation_parameters(topdir)) + undocumented = parameters.difference(documented) + if len(undocumented) > 0: + self.fail(self._format_message(undocumented, + "Parameters that are in the implementation but undocumented:")) diff --git a/python/samba/tests/dsdb.py b/python/samba/tests/dsdb.py new file mode 100644 index 00000000000..3aef1d2fa48 --- /dev/null +++ b/python/samba/tests/dsdb.py @@ -0,0 +1,130 @@ +# Unix SMB/CIFS implementation. Tests for dsdb +# Copyright (C) Matthieu Patou <mat@matws.net> 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dsdb.""" + +from samba.credentials import Credentials +from samba.samdb import SamDB +from samba.auth import system_session +from samba.tests import TestCase +from samba.ndr import ndr_unpack, ndr_pack +from samba.dcerpc import drsblobs +import ldb +import os +import samba + + +class DsdbTests(TestCase): + + def setUp(self): + super(DsdbTests, self).setUp() + self.lp = samba.param.LoadParm() + self.lp.load(os.path.join(os.path.join(self.baseprovpath(), "etc"), "smb.conf")) + self.creds = Credentials() + self.creds.guess(self.lp) + self.session = system_session() + self.samdb = SamDB(os.path.join(self.baseprovpath(), "private", "sam.ldb"), + session_info=self.session, credentials=self.creds,lp=self.lp) + + def baseprovpath(self): + return os.path.join(os.environ['SELFTEST_PREFIX'], "dc") + + def test_get_oid_from_attrid(self): + oid = self.samdb.get_oid_from_attid(591614) + self.assertEquals(oid, "1.2.840.113556.1.4.1790") + + def test_error_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_twoatt_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = long(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + msg["description"] = ldb.MessageElement("new val", ldb.FLAG_MOD_REPLACE, "description") + self.assertRaises(ldb.LdbError, self.samdb.modify, msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_set_replpropertymetadata(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["replPropertyMetaData", "uSNChanged"]) + repl = ndr_unpack(drsblobs.replPropertyMetaDataBlob, + str(res[0]["replPropertyMetaData"])) + ctr = repl.ctr + for o in ctr.array: + # Search for Description + if o.attid == 13: + old_version = o.version + o.version = o.version + 1 + o.local_usn = long(str(res[0]["uSNChanged"])) + 1 + o.originating_usn = long(str(res[0]["uSNChanged"])) + 1 + replBlob = ndr_pack(repl) + msg = ldb.Message() + msg.dn = res[0].dn + msg["replPropertyMetaData"] = ldb.MessageElement(replBlob, ldb.FLAG_MOD_REPLACE, "replPropertyMetaData") + self.samdb.modify(msg, ["local_oid:1.3.6.1.4.1.7165.4.3.14:0"]) + + def test_ok_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(13), "description") + + def test_ko_get_attribute_from_attid(self): + self.assertEquals(self.samdb.get_attribute_from_attid(11979), None) + + def test_get_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "unicodePwd"), 1) + + def test_set_attribute_replmetadata_version(self): + res = self.samdb.search(expression="cn=Administrator", + scope=ldb.SCOPE_SUBTREE, + attrs=["dn"]) + self.assertEquals(len(res), 1) + dn = str(res[0].dn) + version = self.samdb.get_attribute_replmetadata_version(dn, "description") + self.samdb.set_attribute_replmetadata_version(dn, "description", version + 2) + self.assertEqual(self.samdb.get_attribute_replmetadata_version(dn, "description"), version + 2) diff --git a/python/samba/tests/gensec.py b/python/samba/tests/gensec.py new file mode 100644 index 00000000000..e270c418ea6 --- /dev/null +++ b/python/samba/tests/gensec.py @@ -0,0 +1,146 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for GENSEC. + +Note that this just tests the bindings work. It does not intend to test +the functionality, that's already done in other tests. +""" + +from samba.credentials import Credentials +from samba import gensec, auth +import samba.tests + +class GensecTests(samba.tests.TestCase): + + def setUp(self): + super(GensecTests, self).setUp() + self.settings = {} + self.settings["lp_ctx"] = self.lp_ctx = samba.tests.env_loadparm() + self.settings["target_hostname"] = self.lp_ctx.get("netbios name") + """This is just for the API tests""" + self.gensec = gensec.Security.start_client(self.settings) + + def test_start_mech_by_unknown_name(self): + self.assertRaises(RuntimeError, self.gensec.start_mech_by_name, "foo") + + def test_start_mech_by_name(self): + self.gensec.start_mech_by_name("spnego") + + def test_info_uninitialized(self): + self.assertRaises(RuntimeError, self.gensec.session_info) + + def test_update(self): + """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC""" + + """Start up a client and server GENSEC instance to test things with""" + + self.gensec_client = gensec.Security.start_client(self.settings) + self.gensec_client.set_credentials(self.get_credentials()) + self.gensec_client.want_feature(gensec.FEATURE_SEAL) + self.gensec_client.start_mech_by_sasl_name("GSSAPI") + + self.gensec_server = gensec.Security.start_server(settings=self.settings, + auth_context=auth.AuthContext(lp_ctx=self.lp_ctx)) + creds = Credentials() + creds.guess(self.lp_ctx) + creds.set_machine_account(self.lp_ctx) + self.gensec_server.set_credentials(creds) + + self.gensec_server.want_feature(gensec.FEATURE_SEAL) + self.gensec_server.start_mech_by_sasl_name("GSSAPI") + + client_finished = False + server_finished = False + server_to_client = "" + + """Run the actual call loop""" + while not client_finished and not server_finished: + if not client_finished: + print "running client gensec_update" + (client_finished, client_to_server) = self.gensec_client.update(server_to_client) + if not server_finished: + print "running server gensec_update" + (server_finished, server_to_client) = self.gensec_server.update(client_to_server) + session_info = self.gensec_server.session_info() + + test_string = "Hello Server" + test_wrapped = self.gensec_client.wrap(test_string) + test_unwrapped = self.gensec_server.unwrap(test_wrapped) + self.assertEqual(test_string, test_unwrapped) + test_string = "Hello Client" + test_wrapped = self.gensec_server.wrap(test_string) + test_unwrapped = self.gensec_client.unwrap(test_wrapped) + self.assertEqual(test_string, test_unwrapped) + + client_session_key = self.gensec_client.session_key() + server_session_key = self.gensec_server.session_key() + self.assertEqual(client_session_key, server_session_key) + + def test_max_update_size(self): + """Test GENSEC by doing an exchange with ourselves using GSSAPI against a KDC""" + + """Start up a client and server GENSEC instance to test things with""" + + self.gensec_client = gensec.Security.start_client(self.settings) + self.gensec_client.set_credentials(self.get_credentials()) + self.gensec_client.want_feature(gensec.FEATURE_SIGN) + self.gensec_client.set_max_update_size(5) + self.gensec_client.start_mech_by_name("spnego") + + self.gensec_server = gensec.Security.start_server(settings=self.settings, + auth_context=auth.AuthContext(lp_ctx=self.lp_ctx)) + creds = Credentials() + creds.guess(self.lp_ctx) + creds.set_machine_account(self.lp_ctx) + self.gensec_server.set_credentials(creds) + self.gensec_server.want_feature(gensec.FEATURE_SIGN) + self.gensec_server.set_max_update_size(5) + self.gensec_server.start_mech_by_name("spnego") + + client_finished = False + server_finished = False + server_to_client = "" + + """Run the actual call loop""" + i = 0 + while not client_finished or not server_finished: + i += 1 + if not client_finished: + print "running client gensec_update: %d: %r" % (len(server_to_client), server_to_client) + (client_finished, client_to_server) = self.gensec_client.update(server_to_client) + if not server_finished: + print "running server gensec_update: %d: %r" % (len(client_to_server), client_to_server) + (server_finished, server_to_client) = self.gensec_server.update(client_to_server) + + """Here we expect a lot more than the typical 1 or 2 roundtrips""" + self.assertTrue(i > 10) + + session_info = self.gensec_server.session_info() + + test_string = "Hello Server" + test_wrapped = self.gensec_client.wrap(test_string) + test_unwrapped = self.gensec_server.unwrap(test_wrapped) + self.assertEqual(test_string, test_unwrapped) + test_string = "Hello Client" + test_wrapped = self.gensec_server.wrap(test_string) + test_unwrapped = self.gensec_client.unwrap(test_wrapped) + self.assertEqual(test_string, test_unwrapped) + + client_session_key = self.gensec_client.session_key() + server_session_key = self.gensec_server.session_key() + self.assertEqual(client_session_key, server_session_key) diff --git a/python/samba/tests/getopt.py b/python/samba/tests/getopt.py new file mode 100644 index 00000000000..14ee0a7428b --- /dev/null +++ b/python/samba/tests/getopt.py @@ -0,0 +1,55 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for option parsing. + +""" + +import optparse +from samba.getopt import ( + AUTO_USE_KERBEROS, + DONT_USE_KERBEROS, + MUST_USE_KERBEROS, + parse_kerberos_arg, + ) +import samba.tests + +class KerberosOptionTests(samba.tests.TestCase): + + def test_parse_true(self): + self.assertEquals( + MUST_USE_KERBEROS, parse_kerberos_arg("yes", "--kerberos")) + self.assertEquals( + MUST_USE_KERBEROS, parse_kerberos_arg("true", "--kerberos")) + self.assertEquals( + MUST_USE_KERBEROS, parse_kerberos_arg("1", "--kerberos")) + + def test_parse_false(self): + self.assertEquals( + DONT_USE_KERBEROS, parse_kerberos_arg("no", "--kerberos")) + self.assertEquals( + DONT_USE_KERBEROS, parse_kerberos_arg("false", "--kerberos")) + self.assertEquals( + DONT_USE_KERBEROS, parse_kerberos_arg("0", "--kerberos")) + + def test_parse_auto(self): + self.assertEquals( + AUTO_USE_KERBEROS, parse_kerberos_arg("auto", "--kerberos")) + + def test_parse_invalid(self): + self.assertRaises(optparse.OptionValueError, + parse_kerberos_arg, "blah?", "--kerberos") diff --git a/python/samba/tests/hostconfig.py b/python/samba/tests/hostconfig.py new file mode 100644 index 00000000000..526dc0fe4e3 --- /dev/null +++ b/python/samba/tests/hostconfig.py @@ -0,0 +1,74 @@ +# Unix SMB/CIFS implementation. Tests for shares +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.hostconfig.""" + +from samba.hostconfig import SharesContainer +from samba.tests import TestCase + + +class MockService(object): + + def __init__(self, data): + self.data = data + + def __getitem__(self, name): + return self.data[name] + + +class MockLoadParm(object): + + def __init__(self, data): + self.data = data + + def __getitem__(self, name): + return MockService(self.data[name]) + + def __len__(self): + return len(self.data) + + def services(self): + return self.data.keys() + + +class ShareTests(TestCase): + + def _get_shares(self, conf): + return SharesContainer(MockLoadParm(conf)) + + def test_len_no_global(self): + shares = self._get_shares({}) + self.assertEquals(0, len(shares)) + + def test_iter(self): + self.assertEquals([], list(self._get_shares({}))) + self.assertEquals([], list(self._get_shares({"global":{}}))) + self.assertEquals( + ["bla"], + list(self._get_shares({"global":{}, "bla":{}}))) + + def test_len(self): + shares = self._get_shares({"global": {}}) + self.assertEquals(0, len(shares)) + + def test_getitem_nonexistent(self): + shares = self._get_shares({"global": {}}) + self.assertRaises(KeyError, shares.__getitem__, "bla") + + def test_getitem_global(self): + shares = self._get_shares({"global": {}}) + self.assertRaises(KeyError, shares.__getitem__, "global") diff --git a/python/samba/tests/libsmb_samba_internal.py b/python/samba/tests/libsmb_samba_internal.py new file mode 100644 index 00000000000..fe9f197a2c2 --- /dev/null +++ b/python/samba/tests/libsmb_samba_internal.py @@ -0,0 +1,78 @@ +# Unix SMB/CIFS implementation. +# Copyright Volker Lendecke <vl@samba.org> 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.samba3.libsmb_samba_internal.""" + +from samba.samba3 import libsmb_samba_internal +from samba.dcerpc import security +from samba.samba3 import param as s3param +from samba import credentials +import samba.tests +import threading +import sys +import os + +class LibsmbTestCase(samba.tests.TestCase): + + class OpenClose(threading.Thread): + + def __init__(self, conn, filename, num_ops): + threading.Thread.__init__(self) + self.conn = conn + self.filename = filename + self.num_ops = num_ops + self.exc = False + + def run(self): + c = self.conn + try: + for i in range(self.num_ops): + f = c.create(self.filename, CreateDisposition=3, + DesiredAccess=security.SEC_STD_DELETE) + c.delete_on_close(f, True) + c.close(f) + except Exception: + self.exc = sys.exc_info() + + def test_OpenClose(self): + + lp = s3param.get_context() + lp.load(os.getenv("SMB_CONF_PATH")) + + creds = credentials.Credentials() + creds.set_username(os.getenv("USERNAME")) + creds.set_password(os.getenv("PASSWORD")) + + c = libsmb_samba_internal.Conn(os.getenv("SERVER_IP"), "tmp", creds) + + mythreads = [] + + for i in range(3): + t = LibsmbTestCase.OpenClose(c, "test" + str(i), 10) + mythreads.append(t) + + for t in mythreads: + t.start() + + for t in mythreads: + t.join() + if t.exc: + raise t.exc[0](t.exc[1]) + +if __name__ == "__main__": + import unittest + unittest.main() diff --git a/python/samba/tests/messaging.py b/python/samba/tests/messaging.py new file mode 100644 index 00000000000..f0cd368195f --- /dev/null +++ b/python/samba/tests/messaging.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- +# +# Unix SMB/CIFS implementation. +# Copyright © Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.messaging.""" + +from samba.messaging import Messaging +from samba.tests import TestCase +from samba.dcerpc.server_id import server_id + +class MessagingTests(TestCase): + + def get_context(self, *args, **kwargs): + return Messaging(*args, **kwargs) + + def test_register(self): + x = self.get_context() + def callback(): + pass + msg_type = x.register(callback) + x.deregister(callback, msg_type) + + def test_all_servers(self): + x = self.get_context() + self.assertTrue(isinstance(x.irpc_all_servers(), list)) + + def test_by_name(self): + x = self.get_context() + for name in x.irpc_all_servers(): + self.assertTrue(isinstance(x.irpc_servers_byname(name.name), list)) + + def test_assign_server_id(self): + x = self.get_context() + self.assertTrue(isinstance(x.server_id, server_id)) + + def test_ping_speed(self): + server_ctx = self.get_context((0, 1)) + def ping_callback(src, data): + server_ctx.send(src, data) + def exit_callback(): + print "received exit" + msg_ping = server_ctx.register(ping_callback) + msg_exit = server_ctx.register(exit_callback) + + def pong_callback(): + print "received pong" + client_ctx = self.get_context((0, 2)) + msg_pong = client_ctx.register(pong_callback) + + client_ctx.send((0, 1), msg_ping, "testing") + client_ctx.send((0, 1), msg_ping, "") + diff --git a/python/samba/tests/netcmd.py b/python/samba/tests/netcmd.py new file mode 100644 index 00000000000..2cbac4e8bff --- /dev/null +++ b/python/samba/tests/netcmd.py @@ -0,0 +1,90 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2009-2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.netcmd.""" + +from cStringIO import StringIO +from samba.netcmd import Command +from samba.netcmd.testparm import cmd_testparm +from samba.netcmd.main import cmd_sambatool +import samba.tests + +class NetCmdTestCase(samba.tests.TestCase): + + def run_netcmd(self, cmd_klass, args, retcode=0): + cmd = cmd_klass(outf=StringIO(), errf=StringIO()) + try: + retval = cmd._run(cmd_klass.__name__, *args) + except Exception, e: + cmd.show_command_error(e) + retval = 1 + self.assertEquals(retcode, retval) + return cmd.outf.getvalue(), cmd.errf.getvalue() + + def iter_all_subcommands(self): + todo = [] + todo.extend(cmd_sambatool.subcommands.items()) + while todo: + (path, cmd) = todo.pop() + yield path, cmd + subcmds = getattr(cmd, "subcommands", {}) + todo.extend([(path + " " + k, v) for (k, v) in + subcmds.iteritems()]) + + +class TestParmTests(NetCmdTestCase): + + def test_no_client_ip(self): + out, err = self.run_netcmd(cmd_testparm, ["--client-name=foo"], + retcode=-1) + self.assertEquals("", out) + self.assertEquals( + "ERROR: Both a DNS name and an IP address are " + "required for the host access check\n", err) + + +class CommandTests(NetCmdTestCase): + + def test_description(self): + class cmd_foo(Command): + """Mydescription""" + self.assertEquals("Mydescription", cmd_foo().short_description) + + def test_name(self): + class cmd_foo(Command): + pass + self.assertEquals("foo", cmd_foo().name) + + def test_synopsis_everywhere(self): + missing = [] + for path, cmd in self.iter_all_subcommands(): + if cmd.synopsis is None: + missing.append(path) + if missing: + self.fail("The following commands do not have a synopsis set: %r" % + missing) + + def test_short_description_everywhere(self): + missing = [] + for path, cmd in self.iter_all_subcommands(): + if cmd.short_description is None: + missing.append(path) + if not missing: + return + self.fail( + "The following commands do not have a short description set: %r" % + missing) diff --git a/python/samba/tests/ntacls.py b/python/samba/tests/ntacls.py new file mode 100644 index 00000000000..aa9ef6852ba --- /dev/null +++ b/python/samba/tests/ntacls.py @@ -0,0 +1,83 @@ +# Unix SMB/CIFS implementation. Tests for ntacls manipulation +# Copyright (C) Matthieu Patou <mat@matws.net> 2009-2010 +# Copyright (C) Andrew Bartlett 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.ntacls.""" + +from samba.ntacls import setntacl, getntacl, XattrBackendError +from samba.dcerpc import xattr, security +from samba.param import LoadParm +from samba.tests import TestCaseInTempDir, TestSkipped +import random +import os + +class NtaclsTests(TestCaseInTempDir): + + def test_setntacl(self): + lp = LoadParm() + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + open(self.tempf, 'w').write("empty") + lp.set("posix:eadb",os.path.join(self.tempdir,"eadbtest.tdb")) + setntacl(lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467") + os.unlink(os.path.join(self.tempdir,"eadbtest.tdb")) + + def test_setntacl_getntacl(self): + lp = LoadParm() + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + open(self.tempf, 'w').write("empty") + lp.set("posix:eadb",os.path.join(self.tempdir,"eadbtest.tdb")) + setntacl(lp,self.tempf,acl,"S-1-5-21-2212615479-2695158682-2101375467") + facl = getntacl(lp,self.tempf) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(anysid),acl) + os.unlink(os.path.join(self.tempdir,"eadbtest.tdb")) + + def test_setntacl_getntacl_param(self): + lp = LoadParm() + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + open(self.tempf, 'w').write("empty") + setntacl(lp,self.tempf,acl,"S-1-5-21-2212615479-2695158682-2101375467","tdb",os.path.join(self.tempdir,"eadbtest.tdb")) + facl=getntacl(lp,self.tempf,"tdb",os.path.join(self.tempdir,"eadbtest.tdb")) + domsid=security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(domsid),acl) + os.unlink(os.path.join(self.tempdir,"eadbtest.tdb")) + + def test_setntacl_invalidbackend(self): + lp = LoadParm() + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + open(self.tempf, 'w').write("empty") + self.assertRaises(XattrBackendError, setntacl, lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467","ttdb", os.path.join(self.tempdir,"eadbtest.tdb")) + + def test_setntacl_forcenative(self): + if os.getuid() == 0: + raise TestSkipped("Running test as root, test skipped") + lp = LoadParm() + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + open(self.tempf, 'w').write("empty") + lp.set("posix:eadb", os.path.join(self.tempdir,"eadbtest.tdb")) + self.assertRaises(Exception, setntacl, lp, self.tempf ,acl, + "S-1-5-21-2212615479-2695158682-2101375467","native") + + + def setUp(self): + super(NtaclsTests, self).setUp() + self.tempf = os.path.join(self.tempdir, "test") + open(self.tempf, 'w').write("empty") + + def tearDown(self): + os.unlink(self.tempf) + super(NtaclsTests, self).tearDown() diff --git a/python/samba/tests/param.py b/python/samba/tests/param.py new file mode 100644 index 00000000000..f539eba140d --- /dev/null +++ b/python/samba/tests/param.py @@ -0,0 +1,57 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.param.""" + +from samba import param +import samba.tests + +class LoadParmTestCase(samba.tests.TestCase): + + def test_init(self): + file = param.LoadParm() + self.assertTrue(file is not None) + + def test_length(self): + file = param.LoadParm() + self.assertEquals(0, len(file)) + + def test_set_workgroup(self): + file = param.LoadParm() + file.set("workgroup", "bla") + self.assertEquals("BLA", file.get("workgroup")) + + def test_is_mydomain(self): + file = param.LoadParm() + file.set("workgroup", "bla") + self.assertTrue(file.is_mydomain("BLA")) + self.assertFalse(file.is_mydomain("FOOBAR")) + + def test_is_myname(self): + file = param.LoadParm() + file.set("netbios name", "bla") + self.assertTrue(file.is_myname("BLA")) + self.assertFalse(file.is_myname("FOOBAR")) + + def test_load_default(self): + file = param.LoadParm() + file.load_default() + + def test_section_nonexistent(self): + samba_lp = param.LoadParm() + samba_lp.load_default() + self.assertRaises(KeyError, samba_lp.__getitem__, "nonexistent") diff --git a/python/samba/tests/policy.py b/python/samba/tests/policy.py new file mode 100644 index 00000000000..b2457451528 --- /dev/null +++ b/python/samba/tests/policy.py @@ -0,0 +1,34 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2010 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the libpolicy Python bindings. + +""" + +from samba.tests import TestCase +from samba import policy + + +class PolicyTests(TestCase): + + def test_get_gpo_flags(self): + self.assertEquals(["GPO_FLAG_USER_DISABLE"], + policy.get_gpo_flags(policy.GPO_FLAG_USER_DISABLE)) + + def test_get_gplink_options(self): + self.assertEquals(["GPLINK_OPT_DISABLE"], + policy.get_gplink_options(policy.GPLINK_OPT_DISABLE)) diff --git a/python/samba/tests/posixacl.py b/python/samba/tests/posixacl.py new file mode 100644 index 00000000000..7cd22ebccd1 --- /dev/null +++ b/python/samba/tests/posixacl.py @@ -0,0 +1,732 @@ +# Unix SMB/CIFS implementation. Tests for NT and posix ACL manipulation +# Copyright (C) Matthieu Patou <mat@matws.net> 2009-2010 +# Copyright (C) Andrew Bartlett 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Samba3 NT -> posix ACL layer""" + +from samba.ntacls import setntacl, getntacl, checkset_backend +from samba.dcerpc import xattr, security, smb_acl, idmap +from samba.param import LoadParm +from samba.tests import TestCaseInTempDir +from samba import provision +import random +import os +from samba.samba3 import smbd, passdb +from samba.samba3 import param as s3param + +# To print a posix ACL use: +# for entry in posix_acl.acl: +# print "a_type: %d" % entry.a_type +# print "a_perm: %o" % entry.a_perm +# print "uid: %d" % entry.uid +# print "gid: %d" % entry.gid + +class PosixAclMappingTests(TestCaseInTempDir): + + def test_setntacl(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + + def test_setntacl_smbd_getntacl(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=True) + facl = getntacl(self.lp, self.tempf, direct_db_access=True) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(anysid),acl) + + def test_setntacl_smbd_setposixacl_getntacl(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=True) + + # This will invalidate the ACL, as we have a hook! + smbd.set_simple_acl(self.tempf, 0640) + + # However, this only asks the xattr + try: + facl = getntacl(self.lp, self.tempf, direct_db_access=True) + self.assertTrue(False) + except TypeError: + pass + + def test_setntacl_invalidate_getntacl(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=True) + + # This should invalidate the ACL, as we include the posix ACL in the hash + (backend_obj, dbname) = checkset_backend(self.lp, None, None) + backend_obj.wrap_setxattr(dbname, + self.tempf, "system.fake_access_acl", "") + + #however, as this is direct DB access, we do not notice it + facl = getntacl(self.lp, self.tempf, direct_db_access=True) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setntacl_invalidate_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + + # This should invalidate the ACL, as we include the posix ACL in the hash + (backend_obj, dbname) = checkset_backend(self.lp, None, None) + backend_obj.wrap_setxattr(dbname, + self.tempf, "system.fake_access_acl", "") + + #the hash would break, and we return an ACL based only on the mode, except we set the ACL using the 'ntvfs' mode that doesn't include a hash + facl = getntacl(self.lp, self.tempf) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setntacl_smbd_invalidate_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + simple_acl_from_posix = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)(A;;0x001200a9;;;S-1-5-21-2212615479-2695158682-2101375467-513)(A;;;;;WD)" + os.chmod(self.tempf, 0750) + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + + # This should invalidate the ACL, as we include the posix ACL in the hash + (backend_obj, dbname) = checkset_backend(self.lp, None, None) + backend_obj.wrap_setxattr(dbname, + self.tempf, "system.fake_access_acl", "") + + #the hash will break, and we return an ACL based only on the mode + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid)) + + def test_setntacl_smbd_dont_invalidate_getntacl_smbd(self): + # set an ACL on a tempfile + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + os.chmod(self.tempf, 0750) + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + + # now influence the POSIX ACL->SD mapping it returns something else than + # what was set previously + # this should not invalidate the hash and the complete ACL should still + # be returned + self.lp.set("profile acls", "yes") + # we should still get back the ACL (and not one mapped from POSIX ACL) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + self.lp.set("profile acls", "no") + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setntacl_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=True) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(anysid),acl) + + def test_setntacl_smbd_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(anysid),acl) + + def test_setntacl_smbd_setposixacl_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + simple_acl_from_posix = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;;0x001f019f;;;S-1-5-21-2212615479-2695158682-2101375467-512)(A;;0x00120089;;;S-1-5-21-2212615479-2695158682-2101375467-513)(A;;;;;WD)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + # This invalidates the hash of the NT acl just set because there is a hook in the posix ACL set code + smbd.set_simple_acl(self.tempf, 0640) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid)) + + def test_setntacl_smbd_setposixacl_group_getntacl_smbd(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + simple_acl_from_posix = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;;0x001f019f;;;S-1-5-21-2212615479-2695158682-2101375467-512)(A;;0x00120089;;;BA)(A;;0x00120089;;;S-1-5-21-2212615479-2695158682-2101375467-513)(A;;;;;WD)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + # This invalidates the hash of the NT acl just set because there is a hook in the posix ACL set code + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + smbd.set_simple_acl(self.tempf, 0640, BA_gid) + + # This should re-calculate an ACL based on the posix details + facl = getntacl(self.lp,self.tempf, direct_db_access=False) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid)) + + def test_setntacl_smbd_getntacl_smbd_gpo(self): + acl = "O:DAG:DUD:P(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;EA)(A;OICIIO;0x001f01ff;;;CO)(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001200a9;;;ED)S:AI(OU;CIIDSA;WP;f30e3bbe-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)(OU;CIIDSA;WP;f30e3bbf-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + domsid = security.dom_sid("S-1-5-21-2212615479-2695158682-2101375467") + self.assertEquals(facl.as_sddl(domsid),acl) + + def test_setntacl_getposixacl(self): + acl = "O:S-1-5-21-2212615479-2695158682-2101375467-512G:S-1-5-21-2212615479-2695158682-2101375467-513D:(A;OICI;0x001f01ff;;;S-1-5-21-2212615479-2695158682-2101375467-512)" + setntacl(self.lp, self.tempf, acl, "S-1-5-21-2212615479-2695158682-2101375467", use_ntvfs=False) + facl = getntacl(self.lp, self.tempf) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(facl.as_sddl(anysid),acl) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + + def test_setposixacl_getposixacl(self): + smbd.set_simple_acl(self.tempf, 0640) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + self.assertEquals(posix_acl.count, 4) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[0].a_perm, 6) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[1].a_perm, 4) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[3].a_perm, 6) + + def test_setposixacl_getntacl(self): + acl = "" + smbd.set_simple_acl(self.tempf, 0750) + try: + facl = getntacl(self.lp, self.tempf) + self.assertTrue(False) + except TypeError: + # We don't expect the xattr to be filled in in this case + pass + + def test_setposixacl_getntacl_smbd(self): + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + group_SID = s4_passdb.gid_to_sid(os.stat(self.tempf).st_gid) + user_SID = s4_passdb.uid_to_sid(os.stat(self.tempf).st_uid) + smbd.set_simple_acl(self.tempf, 0640) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + acl = "O:%sG:%sD:(A;;0x001f019f;;;%s)(A;;0x00120089;;;%s)(A;;;;;WD)" % (user_SID, group_SID, user_SID, group_SID) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setposixacl_dir_getntacl_smbd(self): + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + user_SID = s4_passdb.uid_to_sid(os.stat(self.tempdir).st_uid) + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + (BA_id,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS) + (SO_id,SO_type) = s4_passdb.sid_to_id(SO_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + smbd.chown(self.tempdir, BA_id, SO_id) + smbd.set_simple_acl(self.tempdir, 0750) + facl = getntacl(self.lp, self.tempdir, direct_db_access=False) + acl = "O:BAG:SOD:(A;;0x001f01ff;;;BA)(A;;0x001200a9;;;SO)(A;;;;;WD)(A;OICIIO;0x001f01ff;;;CO)(A;OICIIO;0x001f01ff;;;CG)(A;OICIIO;0x001f01ff;;;WD)" + + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setposixacl_group_getntacl_smbd(self): + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + group_SID = s4_passdb.gid_to_sid(os.stat(self.tempf).st_gid) + user_SID = s4_passdb.uid_to_sid(os.stat(self.tempf).st_uid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + smbd.set_simple_acl(self.tempf, 0640, BA_gid) + facl = getntacl(self.lp, self.tempf, direct_db_access=False) + domsid = passdb.get_global_sam_sid() + acl = "O:%sG:%sD:(A;;0x001f019f;;;%s)(A;;0x00120089;;;BA)(A;;0x00120089;;;%s)(A;;;;;WD)" % (user_SID, group_SID, user_SID, group_SID) + anysid = security.dom_sid(security.SID_NT_SELF) + self.assertEquals(acl, facl.as_sddl(anysid)) + + def test_setposixacl_getposixacl(self): + smbd.set_simple_acl(self.tempf, 0640) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + self.assertEquals(posix_acl.count, 4) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[0].a_perm, 6) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[1].a_perm, 4) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[3].a_perm, 7) + + def test_setposixacl_dir_getposixacl(self): + smbd.set_simple_acl(self.tempdir, 0750) + posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS) + self.assertEquals(posix_acl.count, 4) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[0].a_perm, 7) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[1].a_perm, 5) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[3].a_perm, 7) + + def test_setposixacl_group_getposixacl(self): + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + smbd.set_simple_acl(self.tempf, 0670, BA_gid) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + + self.assertEquals(posix_acl.count, 5) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[0].a_perm, 6) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[1].a_perm, 7) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[3].a_perm, 7) + self.assertEquals(posix_acl.acl[3].info.gid, BA_gid) + + self.assertEquals(posix_acl.acl[4].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[4].a_perm, 7) + + def test_setntacl_sysvol_check_getposixacl(self): + acl = provision.SYSVOL_ACL + domsid = passdb.get_global_sam_sid() + setntacl(self.lp, self.tempf,acl,str(domsid), use_ntvfs=False) + facl = getntacl(self.lp, self.tempf) + self.assertEquals(facl.as_sddl(domsid),acl) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + + LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR)) + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS) + SY_sid = security.dom_sid(security.SID_NT_SYSTEM) + AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS) + + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + + # These assertions correct for current plugin_s4_dc selftest + # configuration. When other environments have a broad range of + # groups mapped via passdb, we can relax some of these checks + (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid) + self.assertEquals(LA_type, idmap.ID_TYPE_UID) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid) + self.assertEquals(AU_type, idmap.ID_TYPE_BOTH) + + self.assertEquals(posix_acl.count, 9) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[0].a_perm, 7) + self.assertEquals(posix_acl.acl[0].info.gid, BA_gid) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_USER) + self.assertEquals(posix_acl.acl[1].a_perm, 6) + self.assertEquals(posix_acl.acl[1].info.uid, LA_uid) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[3].a_perm, 6) + + self.assertEquals(posix_acl.acl[4].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[4].a_perm, 7) + + self.assertEquals(posix_acl.acl[5].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[5].a_perm, 5) + self.assertEquals(posix_acl.acl[5].info.gid, SO_gid) + + self.assertEquals(posix_acl.acl[6].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[6].a_perm, 7) + self.assertEquals(posix_acl.acl[6].info.gid, SY_gid) + + self.assertEquals(posix_acl.acl[7].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[7].a_perm, 5) + self.assertEquals(posix_acl.acl[7].info.gid, AU_gid) + + self.assertEquals(posix_acl.acl[8].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[8].a_perm, 7) + + +# check that it matches: +# user::rwx +# user:root:rwx (selftest user actually) +# group::rwx +# group:Local Admins:rwx +# group:3000000:r-x +# group:3000001:rwx +# group:3000002:r-x +# mask::rwx +# other::--- + +# +# This is in this order in the NDR smb_acl (not re-orderded for display) +# a_type: GROUP +# a_perm: 7 +# uid: -1 +# gid: 10 +# a_type: USER +# a_perm: 6 +# uid: 0 (selftest user actually) +# gid: -1 +# a_type: OTHER +# a_perm: 0 +# uid: -1 +# gid: -1 +# a_type: USER_OBJ +# a_perm: 6 +# uid: -1 +# gid: -1 +# a_type: GROUP_OBJ +# a_perm: 7 +# uid: -1 +# gid: -1 +# a_type: GROUP +# a_perm: 5 +# uid: -1 +# gid: 3000020 +# a_type: GROUP +# a_perm: 7 +# uid: -1 +# gid: 3000000 +# a_type: GROUP +# a_perm: 5 +# uid: -1 +# gid: 3000001 +# a_type: MASK +# a_perm: 7 +# uid: -1 +# gid: -1 + +# + + + def test_setntacl_sysvol_dir_check_getposixacl(self): + acl = provision.SYSVOL_ACL + domsid = passdb.get_global_sam_sid() + setntacl(self.lp, self.tempdir,acl,str(domsid), use_ntvfs=False) + facl = getntacl(self.lp, self.tempdir) + self.assertEquals(facl.as_sddl(domsid),acl) + posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS) + + LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR)) + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS) + SY_sid = security.dom_sid(security.SID_NT_SYSTEM) + AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS) + + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + + # These assertions correct for current plugin_s4_dc selftest + # configuration. When other environments have a broad range of + # groups mapped via passdb, we can relax some of these checks + (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid) + self.assertEquals(LA_type, idmap.ID_TYPE_UID) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid) + self.assertEquals(AU_type, idmap.ID_TYPE_BOTH) + + self.assertEquals(posix_acl.count, 9) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[0].a_perm, 7) + self.assertEquals(posix_acl.acl[0].info.gid, BA_gid) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_USER) + self.assertEquals(posix_acl.acl[1].a_perm, 7) + self.assertEquals(posix_acl.acl[1].info.uid, LA_uid) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[3].a_perm, 7) + + self.assertEquals(posix_acl.acl[4].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[4].a_perm, 7) + + self.assertEquals(posix_acl.acl[5].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[5].a_perm, 5) + self.assertEquals(posix_acl.acl[5].info.gid, SO_gid) + + self.assertEquals(posix_acl.acl[6].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[6].a_perm, 7) + self.assertEquals(posix_acl.acl[6].info.gid, SY_gid) + + self.assertEquals(posix_acl.acl[7].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[7].a_perm, 5) + self.assertEquals(posix_acl.acl[7].info.gid, AU_gid) + + self.assertEquals(posix_acl.acl[8].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[8].a_perm, 7) + + +# check that it matches: +# user::rwx +# user:root:rwx (selftest user actually) +# group::rwx +# group:3000000:rwx +# group:3000001:r-x +# group:3000002:rwx +# group:3000003:r-x +# mask::rwx +# other::--- + + + def test_setntacl_policies_dir_check_getposixacl(self): + acl = provision.POLICIES_ACL + domsid = passdb.get_global_sam_sid() + setntacl(self.lp, self.tempdir,acl,str(domsid), use_ntvfs=False) + facl = getntacl(self.lp, self.tempdir) + self.assertEquals(facl.as_sddl(domsid),acl) + posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS) + + LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR)) + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS) + SY_sid = security.dom_sid(security.SID_NT_SYSTEM) + AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS) + PA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_POLICY_ADMINS)) + + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + + # These assertions correct for current plugin_s4_dc selftest + # configuration. When other environments have a broad range of + # groups mapped via passdb, we can relax some of these checks + (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid) + self.assertEquals(LA_type, idmap.ID_TYPE_UID) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid) + self.assertEquals(AU_type, idmap.ID_TYPE_BOTH) + (PA_gid,PA_type) = s4_passdb.sid_to_id(PA_sid) + self.assertEquals(PA_type, idmap.ID_TYPE_BOTH) + + self.assertEquals(posix_acl.count, 10) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[0].a_perm, 7) + self.assertEquals(posix_acl.acl[0].info.gid, BA_gid) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_USER) + self.assertEquals(posix_acl.acl[1].a_perm, 7) + self.assertEquals(posix_acl.acl[1].info.uid, LA_uid) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[3].a_perm, 7) + + self.assertEquals(posix_acl.acl[4].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[4].a_perm, 7) + + self.assertEquals(posix_acl.acl[5].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[5].a_perm, 5) + self.assertEquals(posix_acl.acl[5].info.gid, SO_gid) + + self.assertEquals(posix_acl.acl[6].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[6].a_perm, 7) + self.assertEquals(posix_acl.acl[6].info.gid, SY_gid) + + self.assertEquals(posix_acl.acl[7].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[7].a_perm, 5) + self.assertEquals(posix_acl.acl[7].info.gid, AU_gid) + + self.assertEquals(posix_acl.acl[8].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[8].a_perm, 7) + self.assertEquals(posix_acl.acl[8].info.gid, PA_gid) + + self.assertEquals(posix_acl.acl[9].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[9].a_perm, 7) + + +# check that it matches: +# user::rwx +# user:root:rwx (selftest user actually) +# group::rwx +# group:3000000:rwx +# group:3000001:r-x +# group:3000002:rwx +# group:3000003:r-x +# group:3000004:rwx +# mask::rwx +# other::--- + + + + def test_setntacl_policies_check_getposixacl(self): + acl = provision.POLICIES_ACL + + domsid = passdb.get_global_sam_sid() + setntacl(self.lp, self.tempf, acl, str(domsid), use_ntvfs=False) + facl = getntacl(self.lp, self.tempf) + self.assertEquals(facl.as_sddl(domsid),acl) + posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS) + + LA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_ADMINISTRATOR)) + BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS) + SO_sid = security.dom_sid(security.SID_BUILTIN_SERVER_OPERATORS) + SY_sid = security.dom_sid(security.SID_NT_SYSTEM) + AU_sid = security.dom_sid(security.SID_NT_AUTHENTICATED_USERS) + PA_sid = security.dom_sid(str(domsid)+"-"+str(security.DOMAIN_RID_POLICY_ADMINS)) + + s4_passdb = passdb.PDB(self.lp.get("passdb backend")) + + # These assertions correct for current plugin_s4_dc selftest + # configuration. When other environments have a broad range of + # groups mapped via passdb, we can relax some of these checks + (LA_uid,LA_type) = s4_passdb.sid_to_id(LA_sid) + self.assertEquals(LA_type, idmap.ID_TYPE_UID) + (BA_gid,BA_type) = s4_passdb.sid_to_id(BA_sid) + self.assertEquals(BA_type, idmap.ID_TYPE_BOTH) + (SO_gid,SO_type) = s4_passdb.sid_to_id(SO_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (SY_gid,SY_type) = s4_passdb.sid_to_id(SY_sid) + self.assertEquals(SO_type, idmap.ID_TYPE_BOTH) + (AU_gid,AU_type) = s4_passdb.sid_to_id(AU_sid) + self.assertEquals(AU_type, idmap.ID_TYPE_BOTH) + (PA_gid,PA_type) = s4_passdb.sid_to_id(PA_sid) + self.assertEquals(PA_type, idmap.ID_TYPE_BOTH) + + self.assertEquals(posix_acl.count, 10) + + self.assertEquals(posix_acl.acl[0].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[0].a_perm, 7) + self.assertEquals(posix_acl.acl[0].info.gid, BA_gid) + + self.assertEquals(posix_acl.acl[1].a_type, smb_acl.SMB_ACL_USER) + self.assertEquals(posix_acl.acl[1].a_perm, 6) + self.assertEquals(posix_acl.acl[1].info.uid, LA_uid) + + self.assertEquals(posix_acl.acl[2].a_type, smb_acl.SMB_ACL_OTHER) + self.assertEquals(posix_acl.acl[2].a_perm, 0) + + self.assertEquals(posix_acl.acl[3].a_type, smb_acl.SMB_ACL_USER_OBJ) + self.assertEquals(posix_acl.acl[3].a_perm, 6) + + self.assertEquals(posix_acl.acl[4].a_type, smb_acl.SMB_ACL_GROUP_OBJ) + self.assertEquals(posix_acl.acl[4].a_perm, 7) + + self.assertEquals(posix_acl.acl[5].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[5].a_perm, 5) + self.assertEquals(posix_acl.acl[5].info.gid, SO_gid) + + self.assertEquals(posix_acl.acl[6].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[6].a_perm, 7) + self.assertEquals(posix_acl.acl[6].info.gid, SY_gid) + + self.assertEquals(posix_acl.acl[7].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[7].a_perm, 5) + self.assertEquals(posix_acl.acl[7].info.gid, AU_gid) + + self.assertEquals(posix_acl.acl[8].a_type, smb_acl.SMB_ACL_GROUP) + self.assertEquals(posix_acl.acl[8].a_perm, 7) + self.assertEquals(posix_acl.acl[8].info.gid, PA_gid) + + self.assertEquals(posix_acl.acl[9].a_type, smb_acl.SMB_ACL_MASK) + self.assertEquals(posix_acl.acl[9].a_perm, 7) + + +# check that it matches: +# user::rwx +# user:root:rwx (selftest user actually) +# group::rwx +# group:Local Admins:rwx +# group:3000000:r-x +# group:3000001:rwx +# group:3000002:r-x +# group:3000003:rwx +# mask::rwx +# other::--- + +# +# This is in this order in the NDR smb_acl (not re-orderded for display) +# a_type: GROUP +# a_perm: 7 +# uid: -1 +# gid: 10 +# a_type: USER +# a_perm: 6 +# uid: 0 (selftest user actually) +# gid: -1 +# a_type: OTHER +# a_perm: 0 +# uid: -1 +# gid: -1 +# a_type: USER_OBJ +# a_perm: 6 +# uid: -1 +# gid: -1 +# a_type: GROUP_OBJ +# a_perm: 7 +# uid: -1 +# gid: -1 +# a_type: GROUP +# a_perm: 5 +# uid: -1 +# gid: 3000020 +# a_type: GROUP +# a_perm: 7 +# uid: -1 +# gid: 3000000 +# a_type: GROUP +# a_perm: 5 +# uid: -1 +# gid: 3000001 +# a_type: GROUP +# a_perm: 7 +# uid: -1 +# gid: 3000003 +# a_type: MASK +# a_perm: 7 +# uid: -1 +# gid: -1 + +# + + def setUp(self): + super(PosixAclMappingTests, self).setUp() + s3conf = s3param.get_context() + s3conf.load(self.get_loadparm().configfile) + s3conf.set("xattr_tdb:file", os.path.join(self.tempdir,"xattr.tdb")) + self.lp = s3conf + self.tempf = os.path.join(self.tempdir, "test") + open(self.tempf, 'w').write("empty") + + def tearDown(self): + smbd.unlink(self.tempf) + os.unlink(os.path.join(self.tempdir,"xattr.tdb")) + super(PosixAclMappingTests, self).tearDown() diff --git a/python/samba/tests/provision.py b/python/samba/tests/provision.py new file mode 100644 index 00000000000..929e7074f75 --- /dev/null +++ b/python/samba/tests/provision.py @@ -0,0 +1,203 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.provision.""" + +import os +from samba.provision import ( + ProvisionNames, + ProvisionPaths, + ProvisionResult, + determine_netbios_name, + sanitize_server_role, + setup_secretsdb, + findnss, + ) +import samba.tests +from samba.tests import env_loadparm, TestCase + +def create_dummy_secretsdb(path, lp=None): + """Create a dummy secrets database for use in tests. + + :param path: Path to store the secrets db + :param lp: Optional loadparm context. A simple one will + be generated if not specified. + """ + if lp is None: + lp = env_loadparm() + paths = ProvisionPaths() + paths.secrets = path + paths.private_dir = os.path.dirname(path) + paths.keytab = "no.keytab" + paths.dns_keytab = "no.dns.keytab" + secrets_ldb = setup_secretsdb(paths, None, None, lp=lp) + secrets_ldb.transaction_commit() + return secrets_ldb + + +class ProvisionTestCase(samba.tests.TestCaseInTempDir): + """Some simple tests for individual functions in the provisioning code. + """ + + def test_setup_secretsdb(self): + path = os.path.join(self.tempdir, "secrets.ldb") + paths = ProvisionPaths() + secrets_tdb_path = os.path.join(self.tempdir, "secrets.tdb") + secrets_ntdb_path = os.path.join(self.tempdir, "secrets.ntdb") + paths.secrets = path + paths.private_dir = os.path.dirname(path) + paths.keytab = "no.keytab" + paths.dns_keytab = "no.dns.keytab" + ldb = setup_secretsdb(paths, None, None, lp=env_loadparm()) + try: + self.assertEquals("LSA Secrets", + ldb.searchone(basedn="CN=LSA Secrets", attribute="CN")) + finally: + del ldb + os.unlink(path) + if os.path.exists(secrets_tdb_path): + os.unlink(secrets_tdb_path) + if os.path.exists(secrets_ntdb_path): + os.unlink(secrets_ntdb_path) + +class FindNssTests(TestCase): + """Test findnss() function.""" + + def test_nothing(self): + def x(y): + raise KeyError + self.assertRaises(KeyError, findnss, x, []) + + def test_first(self): + self.assertEquals("bla", findnss(lambda x: "bla", ["bla"])) + + def test_skip_first(self): + def x(y): + if y != "bla": + raise KeyError + return "ha" + self.assertEquals("ha", findnss(x, ["bloe", "bla"])) + + +class Disabled(object): + + def test_setup_templatesdb(self): + raise NotImplementedError(self.test_setup_templatesdb) + + def test_setup_registry(self): + raise NotImplementedError(self.test_setup_registry) + + def test_setup_samdb_rootdse(self): + raise NotImplementedError(self.test_setup_samdb_rootdse) + + def test_setup_samdb_partitions(self): + raise NotImplementedError(self.test_setup_samdb_partitions) + + def test_provision_dns(self): + raise NotImplementedError(self.test_provision_dns) + + def test_provision_ldapbase(self): + raise NotImplementedError(self.test_provision_ldapbase) + + def test_provision_guess(self): + raise NotImplementedError(self.test_provision_guess) + + def test_join_domain(self): + raise NotImplementedError(self.test_join_domain) + + def test_vampire(self): + raise NotImplementedError(self.test_vampire) + + +class SanitizeServerRoleTests(TestCase): + + def test_same(self): + self.assertEquals("standalone server", + sanitize_server_role("standalone server")) + self.assertEquals("member server", + sanitize_server_role("member server")) + + def test_invalid(self): + self.assertRaises(ValueError, sanitize_server_role, "foo") + + def test_valid(self): + self.assertEquals( + "standalone server", + sanitize_server_role("ROLE_STANDALONE")) + self.assertEquals( + "standalone server", + sanitize_server_role("standalone")) + self.assertEquals( + "active directory domain controller", + sanitize_server_role("domain controller")) + + +class DummyLogger(object): + + def __init__(self): + self.entries = [] + + def info(self, text, *args): + self.entries.append(("INFO", text % args)) + + +class ProvisionResultTests(TestCase): + + def report_logger(self, result): + logger = DummyLogger() + result.report_logger(logger) + return logger.entries + + def base_result(self): + result = ProvisionResult() + result.server_role = "domain controller" + result.names = ProvisionNames() + result.names.hostname = "hostnaam" + result.names.domain = "DOMEIN" + result.names.dnsdomain = "dnsdomein" + result.domainsid = "S1-1-1" + result.paths = ProvisionPaths() + return result + + def test_basic_report_logger(self): + result = self.base_result() + entries = self.report_logger(result) + self.assertEquals(entries, [ + ('INFO', 'Once the above files are installed, your Samba4 server ' + 'will be ready to use'), + ('INFO', 'Server Role: domain controller'), + ('INFO', 'Hostname: hostnaam'), + ('INFO', 'NetBIOS Domain: DOMEIN'), + ('INFO', 'DNS Domain: dnsdomein'), + ('INFO', 'DOMAIN SID: S1-1-1')]) + + def test_report_logger_adminpass(self): + result = self.base_result() + result.adminpass_generated = True + result.adminpass = "geheim" + entries = self.report_logger(result) + self.assertEquals(entries[1], + ("INFO", 'Admin password: geheim')) + + +class DetermineNetbiosNameTests(TestCase): + + def test_limits_to_15(self): + self.assertEquals("A" * 15, determine_netbios_name("a" * 30)) + + def test_strips_invalid(self): + self.assertEquals("BLABLA", determine_netbios_name("bla/bla")) diff --git a/python/samba/tests/registry.py b/python/samba/tests/registry.py new file mode 100644 index 00000000000..8016a0bb686 --- /dev/null +++ b/python/samba/tests/registry.py @@ -0,0 +1,60 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.registry.""" + +import os +from samba import registry +import samba.tests + +class HelperTests(samba.tests.TestCase): + + def test_predef_to_name(self): + self.assertEquals("HKEY_LOCAL_MACHINE", + registry.get_predef_name(0x80000002)) + + def test_str_regtype(self): + self.assertEquals("REG_DWORD", registry.str_regtype(4)) + + + +class HiveTests(samba.tests.TestCaseInTempDir): + + def setUp(self): + super(HiveTests, self).setUp() + self.hive_path = os.path.join(self.tempdir, "ldb_new.ldb") + self.hive = registry.open_ldb(self.hive_path) + + def tearDown(self): + del self.hive + os.unlink(self.hive_path) + super(HiveTests, self).tearDown() + + def test_ldb_new(self): + self.assertTrue(self.hive is not None) + + #def test_flush(self): + # self.hive.flush() + + #def test_del_value(self): + # self.hive.del_value("FOO") + + +class RegistryTests(samba.tests.TestCase): + + def test_new(self): + self.registry = registry.Registry() diff --git a/python/samba/tests/samba3.py b/python/samba/tests/samba3.py new file mode 100644 index 00000000000..0a7f13c66fa --- /dev/null +++ b/python/samba/tests/samba3.py @@ -0,0 +1,219 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.samba3.""" + +from samba.samba3 import ( + Registry, + WinsDatabase, + IdmapDatabase, + ) +from samba.samba3 import passdb +from samba.samba3 import param as s3param +from samba.tests import TestCase, TestCaseInTempDir +from samba.dcerpc.security import dom_sid +import os + + +for p in [ "../../../../../testdata/samba3", "../../../../testdata/samba3" ]: + DATADIR = os.path.join(os.path.dirname(__file__), p) + if os.path.exists(DATADIR): + break + + +class RegistryTestCase(TestCase): + + def setUp(self): + super(RegistryTestCase, self).setUp() + self.registry = Registry(os.path.join(DATADIR, "registry.tdb")) + + def tearDown(self): + self.registry.close() + super(RegistryTestCase, self).tearDown() + + def test_length(self): + self.assertEquals(28, len(self.registry)) + + def test_keys(self): + self.assertTrue("HKLM" in self.registry.keys()) + + def test_subkeys(self): + self.assertEquals(["SOFTWARE", "SYSTEM"], self.registry.subkeys("HKLM")) + + def test_values(self): + self.assertEquals({'DisplayName': (1L, 'E\x00v\x00e\x00n\x00t\x00 \x00L\x00o\x00g\x00\x00\x00'), + 'ErrorControl': (4L, '\x01\x00\x00\x00')}, + self.registry.values("HKLM/SYSTEM/CURRENTCONTROLSET/SERVICES/EVENTLOG")) + + +class PassdbTestCase(TestCaseInTempDir): + + def setUp(self): + super (PassdbTestCase, self).setUp() + os.system("cp -r %s %s" % (DATADIR, self.tempdir)) + datadir = os.path.join(self.tempdir, "samba3") + + self.lp = s3param.get_context() + self.lp.load(os.path.join(datadir, "smb.conf")) + self.lp.set("private dir", datadir) + self.lp.set("state directory", datadir) + self.lp.set("lock directory", datadir) + passdb.set_secrets_dir(datadir) + self.pdb = passdb.PDB("tdbsam") + + def tearDown(self): + self.lp = [] + self.pdb = [] + os.system("rm -rf %s" % os.path.join(self.tempdir, "samba3")) + super(PassdbTestCase, self).tearDown() + + def test_param(self): + self.assertEquals("BEDWYR", self.lp.get("netbios name")) + self.assertEquals("SAMBA", self.lp.get("workgroup")) + self.assertEquals("USER", self.lp.get("security")) + + def test_policy(self): + policy = self.pdb.get_account_policy() + self.assertEquals(0, policy['bad lockout attempt']) + self.assertEquals(-1, policy['disconnect time']) + self.assertEquals(0, policy['lockout duration']) + self.assertEquals(999999999, policy['maximum password age']) + self.assertEquals(0, policy['minimum password age']) + self.assertEquals(5, policy['min password length']) + self.assertEquals(0, policy['password history']) + self.assertEquals(0, policy['refuse machine password change']) + self.assertEquals(0, policy['reset count minutes']) + self.assertEquals(0, policy['user must logon to change password']) + + def test_get_sid(self): + domain_sid = passdb.get_global_sam_sid() + self.assertEquals(dom_sid("S-1-5-21-2470180966-3899876309-2637894779"), domain_sid) + + def test_usernames(self): + userlist = self.pdb.search_users(0) + self.assertEquals(3, len(userlist)) + + def test_getuser(self): + user = self.pdb.getsampwnam("root") + + self.assertEquals(16, user.acct_ctrl) + self.assertEquals("", user.acct_desc) + self.assertEquals(0, user.bad_password_count) + self.assertEquals(0, user.bad_password_time) + self.assertEquals(0, user.code_page) + self.assertEquals(0, user.country_code) + self.assertEquals("", user.dir_drive) + self.assertEquals("BEDWYR", user.domain) + self.assertEquals("root", user.full_name) + self.assertEquals(dom_sid('S-1-5-21-2470180966-3899876309-2637894779-513'), user.group_sid) + self.assertEquals("\\\\BEDWYR\\root", user.home_dir) + self.assertEquals([-1 for i in range(21)], user.hours) + self.assertEquals(21, user.hours_len) + self.assertEquals(9223372036854775807, user.kickoff_time) + self.assertEquals(None, user.lanman_passwd) + self.assertEquals(9223372036854775807, user.logoff_time) + self.assertEquals(0, user.logon_count) + self.assertEquals(168, user.logon_divs) + self.assertEquals("", user.logon_script) + self.assertEquals(0, user.logon_time) + self.assertEquals("", user.munged_dial) + self.assertEquals('\x87\x8d\x80\x14`l\xda)gzD\xef\xa15?\xc7', user.nt_passwd) + self.assertEquals("", user.nt_username) + self.assertEquals(1125418267, user.pass_can_change_time) + self.assertEquals(1125418267, user.pass_last_set_time) + self.assertEquals(2125418266, user.pass_must_change_time) + self.assertEquals(None, user.plaintext_passwd) + self.assertEquals("\\\\BEDWYR\\root\\profile", user.profile_path) + self.assertEquals(None, user.pw_history) + self.assertEquals(dom_sid("S-1-5-21-2470180966-3899876309-2637894779-1000"), user.user_sid) + self.assertEquals("root", user.username) + self.assertEquals("", user.workstations) + + def test_group_length(self): + grouplist = self.pdb.enum_group_mapping() + self.assertEquals(13, len(grouplist)) + + def test_get_group(self): + group = self.pdb.getgrsid(dom_sid("S-1-5-32-544")) + self.assertEquals("Administrators", group.nt_name) + self.assertEquals(-1, group.gid) + self.assertEquals(5, group.sid_name_use) + + def test_groupsids(self): + grouplist = self.pdb.enum_group_mapping() + sids = [] + for g in grouplist: + sids.append(str(g.sid)) + self.assertTrue("S-1-5-32-544" in sids) + self.assertTrue("S-1-5-32-545" in sids) + self.assertTrue("S-1-5-32-546" in sids) + self.assertTrue("S-1-5-32-548" in sids) + self.assertTrue("S-1-5-32-549" in sids) + self.assertTrue("S-1-5-32-550" in sids) + self.assertTrue("S-1-5-32-551" in sids) + + def test_alias_length(self): + aliaslist = self.pdb.search_aliases() + self.assertEquals(1, len(aliaslist)) + self.assertEquals("Jelmers NT Group", aliaslist[0]['account_name']) + + +class WinsDatabaseTestCase(TestCase): + + def setUp(self): + super(WinsDatabaseTestCase, self).setUp() + self.winsdb = WinsDatabase(os.path.join(DATADIR, "wins.dat")) + + def test_length(self): + self.assertEquals(22, len(self.winsdb)) + + def test_first_entry(self): + self.assertEqual((1124185120, ["192.168.1.5"], 0x64), self.winsdb["ADMINISTRATOR#03"]) + + def tearDown(self): + self.winsdb.close() + super(WinsDatabaseTestCase, self).tearDown() + + +class IdmapDbTestCase(TestCase): + + def setUp(self): + super(IdmapDbTestCase, self).setUp() + self.idmapdb = IdmapDatabase(os.path.join(DATADIR, + "winbindd_idmap.tdb")) + + def test_user_hwm(self): + self.assertEquals(10000, self.idmapdb.get_user_hwm()) + + def test_group_hwm(self): + self.assertEquals(10002, self.idmapdb.get_group_hwm()) + + def test_uids(self): + self.assertEquals(1, len(list(self.idmapdb.uids()))) + + def test_gids(self): + self.assertEquals(3, len(list(self.idmapdb.gids()))) + + def test_get_user_sid(self): + self.assertEquals("S-1-5-21-58189338-3053988021-627566699-501", self.idmapdb.get_user_sid(65534)) + + def test_get_group_sid(self): + self.assertEquals("S-1-5-21-2447931902-1787058256-3961074038-3007", self.idmapdb.get_group_sid(10001)) + + def tearDown(self): + self.idmapdb.close() + super(IdmapDbTestCase, self).tearDown() diff --git a/python/samba/tests/samba3sam.py b/python/samba/tests/samba3sam.py new file mode 100644 index 00000000000..9c017fb79c3 --- /dev/null +++ b/python/samba/tests/samba3sam.py @@ -0,0 +1,1125 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2005-2008 +# Copyright (C) Martin Kuehl <mkhl@samba.org> 2006 +# +# This is a Python port of the original in testprogs/ejs/samba3sam.js +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP.""" + +import os +import ldb +from ldb import SCOPE_DEFAULT, SCOPE_BASE +from samba import Ldb, substitute_var +from samba.tests import TestCaseInTempDir, env_loadparm +import samba.dcerpc.security +import samba.ndr +from samba.auth import system_session +from operator import attrgetter + + +def read_datafile(filename): + paths = [ "../../../../../testdata/samba3", + "../../../../testdata/samba3" ] + for p in paths: + datadir = os.path.join(os.path.dirname(__file__), p) + if os.path.exists(datadir): + break + return open(os.path.join(datadir, filename), 'r').read() + +def ldb_debug(l, text): + print text + + +class MapBaseTestCase(TestCaseInTempDir): + """Base test case for mapping tests.""" + + def setup_modules(self, ldb, s3, s4): + ldb.add({"dn": "@MAP=samba3sam", + "@FROM": s4.basedn, + "@TO": "sambaDomainName=TESTS," + s3.basedn}) + + ldb.add({"dn": "@MODULES", + "@LIST": "rootdse,paged_results,server_sort,asq,samldb,password_hash,operational,objectguid,rdn_name,samba3sam,samba3sid,show_deleted,partition"}) + + ldb.add({"dn": "@PARTITION", + "partition": ["%s" % (s4.basedn_casefold), + "%s" % (s3.basedn_casefold)], + "replicateEntries": ["@ATTRIBUTES", "@INDEXLIST"], + "modules": "*:"}) + + def setUp(self): + self.lp = env_loadparm() + self.lp.set("workgroup", "TESTS") + self.lp.set("netbios name", "TESTS") + super(MapBaseTestCase, self).setUp() + + def make_dn(basedn, rdn): + return "%s,sambaDomainName=TESTS,%s" % (rdn, basedn) + + def make_s4dn(basedn, rdn): + return "%s,%s" % (rdn, basedn) + + self.ldbfile = os.path.join(self.tempdir, "test.ldb") + self.ldburl = "tdb://" + self.ldbfile + + tempdir = self.tempdir + + class Target: + """Simple helper class that contains data for a specific SAM + connection.""" + + def __init__(self, basedn, dn, lp): + self.db = Ldb(lp=lp, session_info=system_session()) + self.db.set_opaque("skip_allocate_sids", "true"); + self.basedn = basedn + self.basedn_casefold = ldb.Dn(self.db, basedn).get_casefold() + self.substvars = {"BASEDN": self.basedn} + self.file = os.path.join(tempdir, "%s.ldb" % self.basedn_casefold) + self.url = "tdb://" + self.file + self._dn = dn + + def dn(self, rdn): + return self._dn(self.basedn, rdn) + + def connect(self): + return self.db.connect(self.url) + + def setup_data(self, path): + self.add_ldif(read_datafile(path)) + + def subst(self, text): + return substitute_var(text, self.substvars) + + def add_ldif(self, ldif): + self.db.add_ldif(self.subst(ldif)) + + def modify_ldif(self, ldif): + self.db.modify_ldif(self.subst(ldif)) + + self.samba4 = Target("dc=vernstok,dc=nl", make_s4dn, self.lp) + self.samba3 = Target("cn=Samba3Sam", make_dn, self.lp) + + self.samba3.connect() + self.samba4.connect() + + def tearDown(self): + os.unlink(self.ldbfile) + os.unlink(self.samba3.file) + os.unlink(self.samba4.file) + pdir = "%s.d" % self.ldbfile + mdata = os.path.join(pdir, "metadata.tdb") + if os.path.exists(mdata): + os.unlink(mdata) + os.rmdir(pdir) + super(MapBaseTestCase, self).tearDown() + + def assertSidEquals(self, text, ndr_sid): + sid_obj1 = samba.ndr.ndr_unpack(samba.dcerpc.security.dom_sid, + str(ndr_sid[0])) + sid_obj2 = samba.dcerpc.security.dom_sid(text) + self.assertEquals(sid_obj1, sid_obj2) + + +class Samba3SamTestCase(MapBaseTestCase): + + def setUp(self): + super(Samba3SamTestCase, self).setUp() + ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session()) + ldb.set_opaque("skip_allocate_sids", "true"); + self.samba3.setup_data("samba3.ldif") + ldif = read_datafile("provision_samba3sam.ldif") + ldb.add_ldif(self.samba4.subst(ldif)) + self.setup_modules(ldb, self.samba3, self.samba4) + del ldb + self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session()) + self.ldb.set_opaque("skip_allocate_sids", "true"); + + def test_search_non_mapped(self): + """Looking up by non-mapped attribute""" + msg = self.ldb.search(expression="(cn=Administrator)") + self.assertEquals(len(msg), 1) + self.assertEquals(msg[0]["cn"], "Administrator") + + def test_search_non_mapped(self): + """Looking up by mapped attribute""" + msg = self.ldb.search(expression="(name=Backup Operators)") + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0]["name"]), "Backup Operators") + + def test_old_name_of_renamed(self): + """Looking up by old name of renamed attribute""" + msg = self.ldb.search(expression="(displayName=Backup Operators)") + self.assertEquals(len(msg), 0) + + def test_mapped_containing_sid(self): + """Looking up mapped entry containing SID""" + msg = self.ldb.search(expression="(cn=Replicator)") + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0].dn), + "cn=Replicator,ou=Groups,dc=vernstok,dc=nl") + self.assertTrue("objectSid" in msg[0]) + self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", + msg[0]["objectSid"]) + oc = set(msg[0]["objectClass"]) + self.assertEquals(oc, set(["group"])) + + def test_search_by_objclass(self): + """Looking up by objectClass""" + msg = self.ldb.search(expression="(|(objectClass=user)(cn=Administrator))") + self.assertEquals(set([str(m.dn) for m in msg]), + set(["unixName=Administrator,ou=Users,dc=vernstok,dc=nl", + "unixName=nobody,ou=Users,dc=vernstok,dc=nl"])) + + def test_s3sam_modify(self): + # Adding a record that will be fallbacked + self.ldb.add({"dn": "cn=Foo", + "foo": "bar", + "blah": "Blie", + "cn": "Foo", + "showInAdvancedViewOnly": "TRUE"} + ) + + # Checking for existence of record (local) + # TODO: This record must be searched in the local database, which is + # currently only supported for base searches + # msg = ldb.search(expression="(cn=Foo)", ['foo','blah','cn','showInAdvancedViewOnly')] + # TODO: Actually, this version should work as well but doesn't... + # + # + msg = self.ldb.search(expression="(cn=Foo)", base="cn=Foo", + scope=SCOPE_BASE, + attrs=['foo','blah','cn','showInAdvancedViewOnly']) + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0]["showInAdvancedViewOnly"]), "TRUE") + self.assertEquals(str(msg[0]["foo"]), "bar") + self.assertEquals(str(msg[0]["blah"]), "Blie") + + # Adding record that will be mapped + self.ldb.add({"dn": "cn=Niemand,cn=Users,dc=vernstok,dc=nl", + "objectClass": "user", + "unixName": "bin", + "sambaUnicodePwd": "geheim", + "cn": "Niemand"}) + + # Checking for existence of record (remote) + msg = self.ldb.search(expression="(unixName=bin)", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0]["cn"]), "Niemand") + self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") + + # Checking for existence of record (local && remote) + msg = self.ldb.search(expression="(&(unixName=bin)(sambaUnicodePwd=geheim))", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + self.assertEquals(len(msg), 1) # TODO: should check with more records + self.assertEquals(str(msg[0]["cn"]), "Niemand") + self.assertEquals(str(msg[0]["unixName"]), "bin") + self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") + + # Checking for existence of record (local || remote) + msg = self.ldb.search(expression="(|(unixName=bin)(sambaUnicodePwd=geheim))", + attrs=['unixName','cn','dn', 'sambaUnicodePwd']) + #print "got %d replies" % len(msg) + self.assertEquals(len(msg), 1) # TODO: should check with more records + self.assertEquals(str(msg[0]["cn"]), "Niemand") + self.assertEquals(str(msg[0]["unixName"]), "bin") + self.assertEquals(str(msg[0]["sambaUnicodePwd"]), "geheim") + + # Checking for data in destination database + msg = self.samba3.db.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(str(msg[0]["sambaSID"]), + "S-1-5-21-4231626423-2410014848-2360679739-2001") + self.assertEquals(str(msg[0]["displayName"]), "Niemand") + + # Adding attribute... + self.ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +add: description +description: Blah +""") + + # Checking whether changes are still there... + msg = self.ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(str(msg[0]["cn"]), "Niemand") + self.assertEquals(str(msg[0]["description"]), "Blah") + + # Modifying attribute... + self.ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +replace: description +description: Blie +""") + + # Checking whether changes are still there... + msg = self.ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertEquals(str(msg[0]["description"]), "Blie") + + # Deleting attribute... + self.ldb.modify_ldif(""" +dn: cn=Niemand,cn=Users,dc=vernstok,dc=nl +changetype: modify +delete: description +""") + + # Checking whether changes are no longer there... + msg = self.ldb.search(expression="(cn=Niemand)") + self.assertTrue(len(msg) >= 1) + self.assertTrue(not "description" in msg[0]) + + # Renaming record... + self.ldb.rename("cn=Niemand,cn=Users,dc=vernstok,dc=nl", + "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + # Checking whether DN has changed... + msg = self.ldb.search(expression="(cn=Niemand2)") + self.assertEquals(len(msg), 1) + self.assertEquals(str(msg[0].dn), + "cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + # Deleting record... + self.ldb.delete("cn=Niemand2,cn=Users,dc=vernstok,dc=nl") + + # Checking whether record is gone... + msg = self.ldb.search(expression="(cn=Niemand2)") + self.assertEquals(len(msg), 0) + + +class MapTestCase(MapBaseTestCase): + + def setUp(self): + super(MapTestCase, self).setUp() + ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session()) + ldb.set_opaque("skip_allocate_sids", "true"); + ldif = read_datafile("provision_samba3sam.ldif") + ldb.add_ldif(self.samba4.subst(ldif)) + self.setup_modules(ldb, self.samba3, self.samba4) + del ldb + self.ldb = Ldb(self.ldburl, lp=self.lp, session_info=system_session()) + self.ldb.set_opaque("skip_allocate_sids", "true"); + + def test_map_search(self): + """Running search tests on mapped data.""" + self.samba3.db.add({ + "dn": "sambaDomainName=TESTS," + self.samba3.basedn, + "objectclass": ["sambaDomain", "top"], + "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739", + "sambaNextRid": "2000", + "sambaDomainName": "TESTS" + }) + + # Add a set of split records + self.ldb.add_ldif(""" +dn: """+ self.samba4.dn("cn=Domain Users") + """ +objectClass: group +cn: Domain Users +objectSid: S-1-5-21-4231626423-2410014848-2360679739-513 +""") + + # Add a set of split records + self.ldb.add_ldif(""" +dn: """+ self.samba4.dn("cn=X") + """ +objectClass: user +cn: X +codePage: x +revision: x +dnsHostName: x +nextRid: y +lastLogon: x +description: x +objectSid: S-1-5-21-4231626423-2410014848-2360679739-552 +""") + + self.ldb.add({ + "dn": self.samba4.dn("cn=Y"), + "objectClass": "top", + "cn": "Y", + "codePage": "x", + "revision": "x", + "dnsHostName": "y", + "nextRid": "y", + "lastLogon": "y", + "description": "x"}) + + self.ldb.add({ + "dn": self.samba4.dn("cn=Z"), + "objectClass": "top", + "cn": "Z", + "codePage": "x", + "revision": "y", + "dnsHostName": "z", + "nextRid": "y", + "lastLogon": "z", + "description": "y"}) + + # Add a set of remote records + + self.samba3.db.add({ + "dn": self.samba3.dn("cn=A"), + "objectClass": "posixAccount", + "cn": "A", + "sambaNextRid": "x", + "sambaBadPasswordCount": "x", + "sambaLogonTime": "x", + "description": "x", + "sambaSID": "S-1-5-21-4231626423-2410014848-2360679739-552", + "sambaPrimaryGroupSID": "S-1-5-21-4231626423-2410014848-2360679739-512"}) + + self.samba3.db.add({ + "dn": self.samba3.dn("cn=B"), + "objectClass": "top", + "cn": "B", + "sambaNextRid": "x", + "sambaBadPasswordCount": "x", + "sambaLogonTime": "y", + "description": "x"}) + + self.samba3.db.add({ + "dn": self.samba3.dn("cn=C"), + "objectClass": "top", + "cn": "C", + "sambaNextRid": "x", + "sambaBadPasswordCount": "y", + "sambaLogonTime": "z", + "description": "y"}) + + # Testing search by DN + + # Search remote record by local DN + dn = self.samba4.dn("cn=A") + res = self.ldb.search(dn, scope=SCOPE_BASE, + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + + # Search remote record by remote DN + dn = self.samba3.dn("cn=A") + res = self.samba3.db.search(dn, scope=SCOPE_BASE, + attrs=["dnsHostName", "lastLogon", "sambaLogonTime"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertTrue(not "lastLogon" in res[0]) + self.assertEquals(str(res[0]["sambaLogonTime"]), "x") + + # Search split record by local DN + dn = self.samba4.dn("cn=X") + res = self.ldb.search(dn, scope=SCOPE_BASE, + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["dnsHostName"]), "x") + self.assertEquals(str(res[0]["lastLogon"]), "x") + + # Search split record by remote DN + dn = self.samba3.dn("cn=X") + res = self.samba3.db.search(dn, scope=SCOPE_BASE, + attrs=["dnsHostName", "lastLogon", "sambaLogonTime"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertTrue(not "lastLogon" in res[0]) + self.assertEquals(str(res[0]["sambaLogonTime"]), "x") + + # Testing search by attribute + + # Search by ignored attribute + res = self.ldb.search(expression="(revision=x)", scope=SCOPE_DEFAULT, + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[0]["dnsHostName"]), "x") + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[1]["dnsHostName"]), "y") + self.assertEquals(str(res[1]["lastLogon"]), "y") + + # Search by kept attribute + res = self.ldb.search(expression="(description=y)", + scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "z") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[1]["dnsHostName"]), "z") + self.assertEquals(str(res[1]["lastLogon"]), "z") + + # Search by renamed attribute + res = self.ldb.search(expression="(badPwdCount=x)", scope=SCOPE_DEFAULT, + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + + # Search by converted attribute + # TODO: + # Using the SID directly in the parse tree leads to conversion + # errors, letting the search fail with no results. + #res = self.ldb.search("(objectSid=S-1-5-21-4231626423-2410014848-2360679739-552)", scope=SCOPE_DEFAULT, attrs) + res = self.ldb.search(expression="(objectSid=*)", base=None, scope=SCOPE_DEFAULT, attrs=["dnsHostName", "lastLogon", "objectSid"]) + self.assertEquals(len(res), 4) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[1]["dnsHostName"]), "x") + self.assertEquals(str(res[1]["lastLogon"]), "x") + self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", + res[1]["objectSid"]) + self.assertTrue("objectSid" in res[1]) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertSidEquals("S-1-5-21-4231626423-2410014848-2360679739-552", + res[0]["objectSid"]) + self.assertTrue("objectSid" in res[0]) + + # Search by generated attribute + # In most cases, this even works when the mapping is missing + # a `convert_operator' by enumerating the remote db. + res = self.ldb.search(expression="(primaryGroupID=512)", + attrs=["dnsHostName", "lastLogon", "primaryGroupID"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[0]["primaryGroupID"]), "512") + + # Note that Xs "objectSid" seems to be fine in the previous search for + # "objectSid"... + #res = ldb.search(expression="(primaryGroupID=*)", NULL, ldb. SCOPE_DEFAULT, attrs) + #print len(res) + " results found" + #for i in range(len(res)): + # for (obj in res[i]) { + # print obj + ": " + res[i][obj] + # } + # print "---" + # + + # Search by remote name of renamed attribute */ + res = self.ldb.search(expression="(sambaBadPasswordCount=*)", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 0) + + # Search by objectClass + attrs = ["dnsHostName", "lastLogon", "objectClass"] + res = self.ldb.search(expression="(objectClass=user)", attrs=attrs) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[0]["objectClass"][0]), "user") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[1]["dnsHostName"]), "x") + self.assertEquals(str(res[1]["lastLogon"]), "x") + self.assertEquals(str(res[1]["objectClass"][0]), "user") + + # Prove that the objectClass is actually used for the search + res = self.ldb.search(expression="(|(objectClass=user)(badPwdCount=x))", + attrs=attrs) + self.assertEquals(len(res), 3) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(res[0]["objectClass"][0], "user") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(set(res[1]["objectClass"]), set(["top"])) + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[2]["dnsHostName"]), "x") + self.assertEquals(str(res[2]["lastLogon"]), "x") + self.assertEquals(str(res[2]["objectClass"][0]), "user") + + # Testing search by parse tree + + # Search by conjunction of local attributes + res = self.ldb.search(expression="(&(codePage=x)(revision=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[0]["dnsHostName"]), "x") + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[1]["dnsHostName"]), "y") + self.assertEquals(str(res[1]["lastLogon"]), "y") + + # Search by conjunction of remote attributes + res = self.ldb.search(expression="(&(lastLogon=x)(description=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[1]["dnsHostName"]), "x") + self.assertEquals(str(res[1]["lastLogon"]), "x") + + # Search by conjunction of local and remote attribute + res = self.ldb.search(expression="(&(codePage=x)(description=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[0]["dnsHostName"]), "x") + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[1]["dnsHostName"]), "y") + self.assertEquals(str(res[1]["lastLogon"]), "y") + + # Search by conjunction of local and remote attribute w/o match + attrs = ["dnsHostName", "lastLogon"] + res = self.ldb.search(expression="(&(codePage=x)(nextRid=x))", + attrs=attrs) + self.assertEquals(len(res), 0) + res = self.ldb.search(expression="(&(revision=x)(lastLogon=z))", + attrs=attrs) + self.assertEquals(len(res), 0) + + # Search by disjunction of local attributes + res = self.ldb.search(expression="(|(revision=x)(dnsHostName=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 2) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[0]["dnsHostName"]), "x") + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[1]["dnsHostName"]), "y") + self.assertEquals(str(res[1]["lastLogon"]), "y") + + # Search by disjunction of remote attributes + res = self.ldb.search(expression="(|(badPwdCount=x)(lastLogon=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 3) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertFalse("dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertFalse("dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[2]["dnsHostName"]), "x") + self.assertEquals(str(res[2]["lastLogon"]), "x") + + # Search by disjunction of local and remote attribute + res = self.ldb.search(expression="(|(revision=x)(lastLogon=y))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 3) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertFalse("dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "y") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[1]["dnsHostName"]), "x") + self.assertEquals(str(res[1]["lastLogon"]), "x") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[2]["dnsHostName"]), "y") + self.assertEquals(str(res[2]["lastLogon"]), "y") + + # Search by disjunction of local and remote attribute w/o match + res = self.ldb.search(expression="(|(codePage=y)(nextRid=z))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 0) + + # Search by negated local attribute + res = self.ldb.search(expression="(!(revision=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 6) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) + self.assertEquals(str(res[2]["lastLogon"]), "z") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[3]["dnsHostName"]), "z") + self.assertEquals(str(res[3]["lastLogon"]), "z") + + # Search by negated remote attribute + res = self.ldb.search(expression="(!(description=x))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 4) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "z") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[1]["dnsHostName"]), "z") + self.assertEquals(str(res[1]["lastLogon"]), "z") + + # Search by negated conjunction of local attributes + res = self.ldb.search(expression="(!(&(codePage=x)(revision=x)))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 6) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) + self.assertEquals(str(res[2]["lastLogon"]), "z") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[3]["dnsHostName"]), "z") + self.assertEquals(str(res[3]["lastLogon"]), "z") + + # Search by negated conjunction of remote attributes + res = self.ldb.search(expression="(!(&(lastLogon=x)(description=x)))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 6) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "y") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "z") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[2]["dnsHostName"]), "y") + self.assertEquals(str(res[2]["lastLogon"]), "y") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[3]["dnsHostName"]), "z") + self.assertEquals(str(res[3]["lastLogon"]), "z") + + # Search by negated conjunction of local and remote attribute + res = self.ldb.search(expression="(!(&(codePage=x)(description=x)))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 6) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) + self.assertEquals(str(res[2]["lastLogon"]), "z") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[3]["dnsHostName"]), "z") + self.assertEquals(str(res[3]["lastLogon"]), "z") + + # Search by negated disjunction of local attributes + res = self.ldb.search(expression="(!(|(revision=x)(dnsHostName=x)))", + attrs=["dnsHostName", "lastLogon"]) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) + self.assertEquals(str(res[2]["lastLogon"]), "z") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[3]["dnsHostName"]), "z") + self.assertEquals(str(res[3]["lastLogon"]), "z") + + # Search by negated disjunction of remote attributes + res = self.ldb.search(expression="(!(|(badPwdCount=x)(lastLogon=x)))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 5) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "z") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=Y")) + self.assertEquals(str(res[1]["dnsHostName"]), "y") + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[2]["dnsHostName"]), "z") + self.assertEquals(str(res[2]["lastLogon"]), "z") + + # Search by negated disjunction of local and remote attribute + res = self.ldb.search(expression="(!(|(revision=x)(lastLogon=y)))", + attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 5) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "z") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[2]["dnsHostName"]), "z") + self.assertEquals(str(res[2]["lastLogon"]), "z") + + # Search by complex parse tree + res = self.ldb.search(expression="(|(&(revision=x)(dnsHostName=x))(!(&(description=x)(nextRid=y)))(badPwdCount=y))", attrs=["dnsHostName", "lastLogon"]) + self.assertEquals(len(res), 7) + res = sorted(res, key=attrgetter('dn')) + self.assertEquals(str(res[0].dn), self.samba4.dn("cn=A")) + self.assertTrue(not "dnsHostName" in res[0]) + self.assertEquals(str(res[0]["lastLogon"]), "x") + self.assertEquals(str(res[1].dn), self.samba4.dn("cn=B")) + self.assertTrue(not "dnsHostName" in res[1]) + self.assertEquals(str(res[1]["lastLogon"]), "y") + self.assertEquals(str(res[2].dn), self.samba4.dn("cn=C")) + self.assertTrue(not "dnsHostName" in res[2]) + self.assertEquals(str(res[2]["lastLogon"]), "z") + self.assertEquals(str(res[3].dn), self.samba4.dn("cn=X")) + self.assertEquals(str(res[3]["dnsHostName"]), "x") + self.assertEquals(str(res[3]["lastLogon"]), "x") + self.assertEquals(str(res[4].dn), self.samba4.dn("cn=Z")) + self.assertEquals(str(res[4]["dnsHostName"]), "z") + self.assertEquals(str(res[4]["lastLogon"]), "z") + + # Clean up + dns = [self.samba4.dn("cn=%s" % n) for n in ["A","B","C","X","Y","Z"]] + for dn in dns: + self.ldb.delete(dn) + + def test_map_modify_local(self): + """Modification of local records.""" + # Add local record + dn = "cn=test,dc=idealx,dc=org" + self.ldb.add({"dn": dn, + "cn": "test", + "foo": "bar", + "revision": "1", + "description": "test"}) + # Check it's there + attrs = ["foo", "revision", "description"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["foo"]), "bar") + self.assertEquals(str(res[0]["revision"]), "1") + self.assertEquals(str(res[0]["description"]), "test") + # Check it's not in the local db + res = self.samba4.db.search(expression="(cn=test)", + scope=SCOPE_DEFAULT, attrs=attrs) + self.assertEquals(len(res), 0) + # Check it's not in the remote db + res = self.samba3.db.search(expression="(cn=test)", + scope=SCOPE_DEFAULT, attrs=attrs) + self.assertEquals(len(res), 0) + + # Modify local record + ldif = """ +dn: """ + dn + """ +replace: foo +foo: baz +replace: description +description: foo +""" + self.ldb.modify_ldif(ldif) + # Check in local db + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["foo"]), "baz") + self.assertEquals(str(res[0]["revision"]), "1") + self.assertEquals(str(res[0]["description"]), "foo") + + # Rename local record + dn2 = "cn=toast,dc=idealx,dc=org" + self.ldb.rename(dn, dn2) + # Check in local db + res = self.ldb.search(dn2, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["foo"]), "baz") + self.assertEquals(str(res[0]["revision"]), "1") + self.assertEquals(str(res[0]["description"]), "foo") + + # Delete local record + self.ldb.delete(dn2) + # Check it's gone + res = self.ldb.search(dn2, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) + + def test_map_modify_remote_remote(self): + """Modification of remote data of remote records""" + # Add remote record + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.samba3.db.add({"dn": dn2, + "cn": "test", + "description": "foo", + "sambaBadPasswordCount": "3", + "sambaNextRid": "1001"}) + # Check it's there + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, + attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "foo") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + # Check in mapped db + attrs = ["description", "badPwdCount", "nextRid"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs, expression="") + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "foo") + self.assertEquals(str(res[0]["badPwdCount"]), "3") + self.assertEquals(str(res[0]["nextRid"]), "1001") + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 0) + + # Modify remote data of remote record + ldif = """ +dn: """ + dn + """ +replace: description +description: test +replace: badPwdCount +badPwdCount: 4 +""" + self.ldb.modify_ldif(ldif) + # Check in mapped db + res = self.ldb.search(dn, scope=SCOPE_BASE, + attrs=["description", "badPwdCount", "nextRid"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["badPwdCount"]), "4") + self.assertEquals(str(res[0]["nextRid"]), "1001") + # Check in remote db + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, + attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + + # Rename remote record + dn2 = self.samba4.dn("cn=toast") + self.ldb.rename(dn, dn2) + # Check in mapped db + dn = dn2 + res = self.ldb.search(dn, scope=SCOPE_BASE, + attrs=["description", "badPwdCount", "nextRid"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["badPwdCount"]), "4") + self.assertEquals(str(res[0]["nextRid"]), "1001") + # Check in remote db + dn2 = self.samba3.dn("cn=toast") + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, + attrs=["description", "sambaBadPasswordCount", "sambaNextRid"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + + # Delete remote record + self.ldb.delete(dn) + # Check in mapped db that it's removed + res = self.ldb.search(dn, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in remote db + res = self.samba3.db.search(dn2, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) + + def test_map_modify_remote_local(self): + """Modification of local data of remote records""" + # Add remote record (same as before) + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.samba3.db.add({"dn": dn2, + "cn": "test", + "description": "foo", + "sambaBadPasswordCount": "3", + "sambaNextRid": "1001"}) + + # Modify local data of remote record + ldif = """ +dn: """ + dn + """ +add: revision +revision: 1 +replace: description +description: test + +""" + self.ldb.modify_ldif(ldif) + # Check in mapped db + attrs = ["revision", "description"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["revision"]), "1") + # Check in remote db + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "test") + self.assertTrue(not "revision" in res[0]) + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "description" in res[0]) + self.assertEquals(str(res[0]["revision"]), "1") + + # Delete (newly) split record + self.ldb.delete(dn) + + def test_map_modify_split(self): + """Testing modification of split records""" + # Add split record + dn = self.samba4.dn("cn=test") + dn2 = self.samba3.dn("cn=test") + self.ldb.add({ + "dn": dn, + "cn": "test", + "description": "foo", + "badPwdCount": "3", + "nextRid": "1001", + "revision": "1"}) + # Check it's there + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "foo") + self.assertEquals(str(res[0]["badPwdCount"]), "3") + self.assertEquals(str(res[0]["nextRid"]), "1001") + self.assertEquals(str(res[0]["revision"]), "1") + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) + self.assertEquals(str(res[0]["revision"]), "1") + # Check in remote db + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", + "revision"] + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "foo") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "3") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + self.assertTrue(not "revision" in res[0]) + + # Modify of split record + ldif = """ +dn: """ + dn + """ +replace: description +description: test +replace: badPwdCount +badPwdCount: 4 +replace: revision +revision: 2 +""" + self.ldb.modify_ldif(ldif) + # Check in mapped db + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["badPwdCount"]), "4") + self.assertEquals(str(res[0]["nextRid"]), "1001") + self.assertEquals(str(res[0]["revision"]), "2") + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) + self.assertEquals(str(res[0]["revision"]), "2") + # Check in remote db + attrs = ["description", "sambaBadPasswordCount", "sambaNextRid", + "revision"] + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + self.assertTrue(not "revision" in res[0]) + + # Rename split record + dn2 = self.samba4.dn("cn=toast") + self.ldb.rename(dn, dn2) + # Check in mapped db + dn = dn2 + attrs = ["description", "badPwdCount", "nextRid", "revision"] + res = self.ldb.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["badPwdCount"]), "4") + self.assertEquals(str(res[0]["nextRid"]), "1001") + self.assertEquals(str(res[0]["revision"]), "2") + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE, attrs=attrs) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn) + self.assertTrue(not "description" in res[0]) + self.assertTrue(not "badPwdCount" in res[0]) + self.assertTrue(not "nextRid" in res[0]) + self.assertEquals(str(res[0]["revision"]), "2") + # Check in remote db + dn2 = self.samba3.dn("cn=toast") + res = self.samba3.db.search(dn2, scope=SCOPE_BASE, + attrs=["description", "sambaBadPasswordCount", "sambaNextRid", + "revision"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0].dn), dn2) + self.assertEquals(str(res[0]["description"]), "test") + self.assertEquals(str(res[0]["sambaBadPasswordCount"]), "4") + self.assertEquals(str(res[0]["sambaNextRid"]), "1001") + self.assertTrue(not "revision" in res[0]) + + # Delete split record + self.ldb.delete(dn) + # Check in mapped db + res = self.ldb.search(dn, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in local db + res = self.samba4.db.search(dn, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) + # Check in remote db + res = self.samba3.db.search(dn2, scope=SCOPE_BASE) + self.assertEquals(len(res), 0) diff --git a/python/samba/tests/samba_tool/__init__.py b/python/samba/tests/samba_tool/__init__.py new file mode 100644 index 00000000000..3d7f0591e25 --- /dev/null +++ b/python/samba/tests/samba_tool/__init__.py @@ -0,0 +1,15 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. diff --git a/python/samba/tests/samba_tool/base.py b/python/samba/tests/samba_tool/base.py new file mode 100644 index 00000000000..60ccaa543d7 --- /dev/null +++ b/python/samba/tests/samba_tool/base.py @@ -0,0 +1,114 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +# This provides a wrapper around the cmd interface so that tests can +# easily be built on top of it and have minimal code to run basic tests +# of the commands. A list of the environmental variables can be found in +# ~/selftest/selftest.pl +# +# These can all be accesses via os.environ["VARIBLENAME"] when needed + +import random +import string +from samba.auth import system_session +from samba.samdb import SamDB +from cStringIO import StringIO +from samba.netcmd.main import cmd_sambatool +import samba.tests + +class SambaToolCmdTest(samba.tests.TestCaseInTempDir): + + def getSamDB(self, *argv): + """a convenience function to get a samdb instance so that we can query it""" + + # We build a fake command to get the options created the same + # way the command classes do it. It would be better if the command + # classes had a way to more cleanly do this, but this lets us write + # tests for now + cmd = cmd_sambatool.subcommands["user"].subcommands["setexpiry"] + parser, optiongroups = cmd._create_parser("user") + opts, args = parser.parse_args(list(argv)) + # Filter out options from option groups + args = args[1:] + kwargs = dict(opts.__dict__) + for option_group in parser.option_groups: + for option in option_group.option_list: + if option.dest is not None: + del kwargs[option.dest] + kwargs.update(optiongroups) + + H = kwargs.get("H", None) + sambaopts = kwargs.get("sambaopts", None) + credopts = kwargs.get("credopts", None) + + lp = sambaopts.get_loadparm() + creds = credopts.get_credentials(lp, fallback_machine=True) + + samdb = SamDB(url=H, session_info=system_session(), + credentials=creds, lp=lp) + return samdb + + + def runcmd(self, name, *args): + """run a single level command""" + cmd = cmd_sambatool.subcommands[name] + cmd.outf = StringIO() + cmd.errf = StringIO() + result = cmd._run(name, *args) + return (result, cmd.outf.getvalue(), cmd.errf.getvalue()) + + def runsubcmd(self, name, sub, *args): + """run a command with sub commands""" + # The reason we need this function seperate from runcmd is + # that the .outf StringIO assignment is overriden if we use + # runcmd, so we can't capture stdout and stderr + cmd = cmd_sambatool.subcommands[name].subcommands[sub] + cmd.outf = StringIO() + cmd.errf = StringIO() + result = cmd._run(name, *args) + return (result, cmd.outf.getvalue(), cmd.errf.getvalue()) + + def assertCmdSuccess(self, val, msg=""): + self.assertIsNone(val, msg) + + def assertCmdFail(self, val, msg=""): + self.assertIsNotNone(val, msg) + + def assertMatch(self, base, string, msg=""): + self.assertTrue(string in base, msg) + + def randomName(self, count=8): + """Create a random name, cap letters and numbers, and always starting with a letter""" + name = random.choice(string.ascii_uppercase) + name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 1)) + return name + + def randomPass(self, count=16): + name = random.choice(string.ascii_uppercase) + name += random.choice(string.digits) + name += random.choice(string.ascii_lowercase) + name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 3)) + return name + + def randomXid(self): + # pick some hopefully unused, high UID/GID range to avoid interference + # from the system the test runs on + xid = random.randint(4711000, 4799000) + return xid + + def assertWithin(self, val1, val2, delta, msg=""): + """Assert that val1 is within delta of val2, useful for time computations""" + self.assertTrue(((val1 + delta) > val2) and ((val1 - delta) < val2), msg) diff --git a/python/samba/tests/samba_tool/gpo.py b/python/samba/tests/samba_tool/gpo.py new file mode 100644 index 00000000000..e20a97794ae --- /dev/null +++ b/python/samba/tests/samba_tool/gpo.py @@ -0,0 +1,79 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# based on time.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from samba.tests.samba_tool.base import SambaToolCmdTest +import shutil + +class GpoCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool time subcommands""" + + gpo_name = "testgpo" + + def test_gpo_list(self): + """Run gpo list against the server and make sure it looks accurate""" + (result, out, err) = self.runsubcmd("gpo", "listall", "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring gpo listall ran successfully") + + def test_fetchfail(self): + """Run against a non-existent GPO, and make sure it fails (this hard-coded UUID is very unlikely to exist""" + (result, out, err) = self.runsubcmd("gpo", "fetch", "c25cac17-a02a-4151-835d-fae17446ee43", "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertEquals(result, -1, "check for result code") + + def test_fetch(self): + """Run against a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "fetch", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "--tmpdir", self.tempdir) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + shutil.rmtree(os.path.join(self.tempdir, "policy")) + + def test_show(self): + """Show a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + + def test_show_as_admin(self): + """Show a real GPO, and make sure it passes""" + (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo fetched successfully") + + def test_aclcheck(self): + """Check all the GPOs on the remote server have correct ACLs""" + (result, out, err) = self.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo checked successfully") + + def setUp(self): + """set up a temporary GPO to work with""" + super(GpoCmdTestCase, self).setUp() + (result, out, err) = self.runsubcmd("gpo", "create", self.gpo_name, + "-H", "ldap://%s" % os.environ["SERVER"], + "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]), + "--tmpdir", self.tempdir) + shutil.rmtree(os.path.join(self.tempdir, "policy")) + self.assertCmdSuccess(result, "Ensuring gpo created successfully") + try: + self.gpo_guid = "{%s}" % out.split("{")[1].split("}")[0] + except IndexError: + self.fail("Failed to find GUID in output: %s" % out) + + def tearDown(self): + """remove the temporary GPO to work with""" + (result, out, err) = self.runsubcmd("gpo", "del", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"])) + self.assertCmdSuccess(result, "Ensuring gpo deleted successfully") + super(GpoCmdTestCase, self).tearDown() diff --git a/python/samba/tests/samba_tool/group.py b/python/samba/tests/samba_tool/group.py new file mode 100644 index 00000000000..2c0c46e5dc8 --- /dev/null +++ b/python/samba/tests/samba_tool/group.py @@ -0,0 +1,169 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Michael Adam 2012 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + nttime2unix, + dsdb + ) + +class GroupCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool group subcommands""" + groups = [] + samdb = None + + def setUp(self): + super(GroupCmdTestCase, self).setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.groups = [] + self.groups.append(self._randomGroup({"name": "testgroup1"})) + self.groups.append(self._randomGroup({"name": "testgroup2"})) + self.groups.append(self._randomGroup({"name": "testgroup3"})) + self.groups.append(self._randomGroup({"name": "testgroup4"})) + + # setup the 4 groups and ensure they are correct + for group in self.groups: + (result, out, err) = self._create_group(group) + + self.assertCmdSuccess(result) + self.assertEquals(err, "", "There shouldn't be any error message") + self.assertIn("Added group %s" % group["name"], out) + + found = self._find_group(group["name"]) + + self.assertIsNotNone(found) + + self.assertEquals("%s" % found.get("name"), group["name"]) + self.assertEquals("%s" % found.get("description"), group["description"]) + + def tearDown(self): + super(GroupCmdTestCase, self).tearDown() + # clean up all the left over groups, just in case + for group in self.groups: + if self._find_group(group["name"]): + self.runsubcmd("group", "delete", group["name"]) + + + def test_newgroup(self): + """This tests the "group add" and "group delete" commands""" + # try to add all the groups again, this should fail + for group in self.groups: + (result, out, err) = self._create_group(group) + self.assertCmdFail(result, "Succeeded to create existing group") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the groups we just added + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "delete", group["name"]) + self.assertCmdSuccess(result, + "Failed to delete group '%s'" % group["name"]) + found = self._find_group(group["name"]) + self.assertIsNone(found, + "Deleted group '%s' still exists" % group["name"]) + + # test adding groups + for group in self.groups: + (result, out, err) = self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","There shouldn't be any error message") + self.assertIn("Added group %s" % group["name"], out) + + found = self._find_group(group["name"]) + + self.assertEquals("%s" % found.get("samaccountname"), + "%s" % group["name"]) + + + def test_list(self): + (result, out, err) = self.runsubcmd("group", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running list") + + search_filter = "(objectClass=group)" + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = groupobj.get("samaccountname", idx=0) + found = self.assertMatch(out, name, + "group '%s' not found" % name) + + def test_listmembers(self): + (result, out, err) = self.runsubcmd("group", "listmembers", "Domain Users", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running listmembers") + + search_filter = "(|(primaryGroupID=513)(memberOf=CN=Domain Users,CN=Users,%s))" % self.samdb.domain_dn() + + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samAccountName"]) + + self.assertTrue(len(grouplist) > 0, "no groups found in samdb") + + for groupobj in grouplist: + name = groupobj.get("samAccountName", idx=0) + found = self.assertMatch(out, name, "group '%s' not found" % name) + + def _randomGroup(self, base={}): + """create a group with random attribute values, you can specify base attributes""" + group = { + "name": self.randomName(), + "description": self.randomName(count=100), + } + group.update(base) + return group + + def _create_group(self, group): + return self.runsubcmd("group", "add", group["name"], + "--description=%s" % group["description"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + + def _find_group(self, name): + search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" % + (ldb.binary_encode(name), + "CN=Group,CN=Schema,CN=Configuration", + self.samdb.domain_dn())) + grouplist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=[]) + if grouplist: + return grouplist[0] + else: + return None diff --git a/python/samba/tests/samba_tool/ntacl.py b/python/samba/tests/samba_tool/ntacl.py new file mode 100644 index 00000000000..2a329fe7d40 --- /dev/null +++ b/python/samba/tests/samba_tool/ntacl.py @@ -0,0 +1,135 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# Based on user.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +import random + +class NtACLCmdSysvolTestCase(SambaToolCmdTest): + """Tests for samba-tool ntacl sysvol* subcommands""" + + + def test_ntvfs(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + def test_s3fs(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_ntvfs_check(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_s3fs_check(self): + (result, out, err) = self.runsubcmd("ntacl", "sysvolreset", + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "sysvolcheck") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + +class NtACLCmdGetSetTestCase(SambaToolCmdTest): + """Tests for samba-tool ntacl get/set subcommands""" + + acl = "O:DAG:DUD:P(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;EA)(A;OICIIO;0x001f01ff;;;CO)(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001200a9;;;ED)S:AI(OU;CIIDSA;WP;f30e3bbe-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)(OU;CIIDSA;WP;f30e3bbf-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)" + + + def test_ntvfs(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + def test_s3fs(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-s3fs") + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(out,"","Shouldn't be any output messages") + + def test_ntvfs_check(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-ntvfs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertIn("Please note that POSIX permissions have NOT been changed, only the stored NT ACL", err) + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "get", tempf, + "--use-ntvfs", "--as-sddl") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(self.acl+"\n", out, "Output should be the ACL") + + def test_s3fs_check(self): + path = os.environ['SELFTEST_PREFIX'] + tempf = os.path.join(path,"pytests"+str(int(100000*random.random()))) + open(tempf, 'w').write("empty") + + (result, out, err) = self.runsubcmd("ntacl", "set", self.acl, tempf, + "--use-s3fs") + self.assertCmdSuccess(result) + self.assertEquals(out,"","Shouldn't be any output messages") + self.assertEquals(err,"","Shouldn't be any error messages") + + # Now check they were set correctly + (result, out, err) = self.runsubcmd("ntacl", "get", tempf, + "--use-s3fs", "--as-sddl") + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertEquals(self.acl+"\n", out,"Output should be the ACL") diff --git a/python/samba/tests/samba_tool/processes.py b/python/samba/tests/samba_tool/processes.py new file mode 100644 index 00000000000..91a5266b54a --- /dev/null +++ b/python/samba/tests/samba_tool/processes.py @@ -0,0 +1,35 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett 2012 +# +# based on time.py: +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from samba.tests.samba_tool.base import SambaToolCmdTest + +class ProcessCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool process subcommands""" + + def test_name(self): + """Run processes command""" + (result, out, err) = self.runcmd("processes", "--name", "samba") + self.assertCmdSuccess(result, "Ensuring processes ran successfully") + + def test_all(self): + """Run processes command""" + (result, out, err) = self.runcmd("processes") + self.assertCmdSuccess(result, "Ensuring processes ran successfully") diff --git a/python/samba/tests/samba_tool/timecmd.py b/python/samba/tests/samba_tool/timecmd.py new file mode 100644 index 00000000000..000f0f28282 --- /dev/null +++ b/python/samba/tests/samba_tool/timecmd.py @@ -0,0 +1,43 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +from time import localtime, strptime, mktime +from samba.tests.samba_tool.base import SambaToolCmdTest + +class TimeCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool time subcommands""" + + def test_timeget(self): + """Run time against the server and make sure it looks accurate""" + (result, out, err) = self.runcmd("time", os.environ["SERVER"]) + self.assertCmdSuccess(result, "Ensuring time ran successfully") + + timefmt = strptime(out, "%a %b %d %H:%M:%S %Y %Z\n") + servertime = int(mktime(timefmt)) + now = int(mktime(localtime())) + + # because there is a race here, allow up to 5 seconds difference in times + delta = 5 + self.assertTrue((servertime > (now - delta) and (servertime < (now + delta)), "Time is now")) + + def test_timefail(self): + """Run time against a non-existent server, and make sure it fails""" + (result, out, err) = self.runcmd("time", "notaserver") + self.assertEquals(result, -1, "check for result code") + self.assertTrue(err.strip().endswith("NT_STATUS_OBJECT_NAME_NOT_FOUND"), "ensure right error string") + self.assertEquals(out, "", "ensure no output returned") diff --git a/python/samba/tests/samba_tool/user.py b/python/samba/tests/samba_tool/user.py new file mode 100644 index 00000000000..33344cd3d31 --- /dev/null +++ b/python/samba/tests/samba_tool/user.py @@ -0,0 +1,362 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +import os +import time +import ldb +from samba.tests.samba_tool.base import SambaToolCmdTest +from samba import ( + nttime2unix, + dsdb + ) + +class UserCmdTestCase(SambaToolCmdTest): + """Tests for samba-tool user subcommands""" + users = [] + samdb = None + + def setUp(self): + super(UserCmdTestCase, self).setUp() + self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.users = [] + self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"})) + self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"})) + self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"})) + self.users.append(self._randomPosixUser({"name": "posixuser1"})) + self.users.append(self._randomPosixUser({"name": "posixuser2"})) + self.users.append(self._randomPosixUser({"name": "posixuser3"})) + self.users.append(self._randomPosixUser({"name": "posixuser4"})) + + # setup the 8 users and ensure they are correct + for user in self.users: + (result, out, err) = user["createUserFn"](user) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + user["checkUserFn"](user) + + + def tearDown(self): + super(UserCmdTestCase, self).tearDown() + # clean up all the left over users, just in case + for user in self.users: + if self._find_user(user["name"]): + self.runsubcmd("user", "delete", user["name"]) + + + def test_newuser(self): + # try to add all the users again, this should fail + for user in self.users: + (result, out, err) = self._create_user(user) + self.assertCmdFail(result, "Ensure that create user fails") + self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err) + + # try to delete all the 4 users we just added + for user in self.users: + (result, out, err) = self.runsubcmd("user", "delete", user["name"]) + self.assertCmdSuccess(result, "Can we delete users") + found = self._find_user(user["name"]) + self.assertIsNone(found) + + # test adding users with --use-username-as-cn + for user in self.users: + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--use-username-as-cn", + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("cn"), "%(name)s" % user) + self.assertEquals("%s" % found.get("name"), "%(name)s" % user) + + + + def test_setpassword(self): + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword with url") + self.assertMatch(out, "Changed password OK", "setpassword with url") + + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword without url") + self.assertMatch(out, "Changed password OK", "setpassword without url") + + for user in self.users: + newpasswd = self.randomPass() + (result, out, err) = self.runsubcmd("user", "setpassword", + user["name"], + "--newpassword=%s" % newpasswd, + "--must-change-at-next-login", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + # self.assertCmdSuccess(result, "Ensure setpassword runs") + self.assertEquals(err,"","setpassword with forced change") + self.assertMatch(out, "Changed password OK", "setpassword with forced change") + + + + + def test_setexpiry(self): + twodays = time.time() + (2 * 24 * 60 * 60) + + for user in self.users: + (result, out, err) = self.runsubcmd("user", "setexpiry", user["name"], + "--days=2", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Can we run setexpiry with names") + self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out) + + for user in self.users: + found = self._find_user(user["name"]) + + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + # TODO: renable this after the filter case is sorted out + if "filters are broken, bail now": + return + + # now run the expiration based on a filter + fourdays = time.time() + (4 * 24 * 60 * 60) + (result, out, err) = self.runsubcmd("user", "setexpiry", + "--filter", "(&(objectClass=user)(company=comp2))", + "--days=4", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Can we run setexpiry with a filter") + + for user in self.users: + found = self._find_user(user["name"]) + if ("%s" % found.get("company")) == "comp2": + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time") + else: + expires = nttime2unix(int("%s" % found.get("accountExpires"))) + self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time") + + + def test_list(self): + (result, out, err) = self.runsubcmd("user", "list", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], + os.environ["DC_PASSWORD"])) + self.assertCmdSuccess(result, "Error running list") + + search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" % + (ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT)) + + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, + attrs=["samaccountname"]) + + self.assertTrue(len(userlist) > 0, "no users found in samdb") + + for userobj in userlist: + name = userobj.get("samaccountname", idx=0) + found = self.assertMatch(out, name, + "user '%s' not found" % name) + def test_getpwent(self): + try: + import pwd + except ImportError: + self.skipTest("Skipping getpwent test, no 'pwd' module available") + return + + # get the current user's data for the test + uid = os.geteuid() + try: + u = pwd.getpwuid(uid) + except KeyError: + self.skipTest("Skipping getpwent test, current EUID not found in NSS") + return + + user = self._randomPosixUser({ + "name": u[0], + "uid": u[0], + "uidNumber": u[2], + "gidNumber": u[3], + "gecos": u[4], + "loginShell": u[6], + }) + # check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid() + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--rfc2307-from-nss", + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + # Check if overriding the attributes from NSS with explicit values works + # + # get a user with all random posix attributes + user = self._randomPosixUser({"name": u[0]}) + # create a user with posix attributes from nss but override all of them with the + # random ones just obtained + (result, out, err) = self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--rfc2307-from-nss", + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + self.assertCmdSuccess(result) + self.assertEquals(err,"","Shouldn't be any error messages") + self.assertIn("User '%s' created successfully" % user["name"], out) + + self._check_posix_user(user) + self.runsubcmd("user", "delete", user["name"]) + + def _randomUser(self, base={}): + """create a user with random attribute values, you can specify base attributes""" + user = { + "name": self.randomName(), + "password": self.randomPass(), + "surname": self.randomName(), + "given-name": self.randomName(), + "job-title": self.randomName(), + "department": self.randomName(), + "company": self.randomName(), + "description": self.randomName(count=100), + "createUserFn": self._create_user, + "checkUserFn": self._check_user, + } + user.update(base) + return user + + def _randomPosixUser(self, base={}): + """create a user with random attribute values and additional RFC2307 + attributes, you can specify base attributes""" + user = self._randomUser({}) + user.update(base) + posixAttributes = { + "uid": self.randomName(), + "loginShell": self.randomName(), + "gecos": self.randomName(), + "uidNumber": self.randomXid(), + "gidNumber": self.randomXid(), + "createUserFn": self._create_posix_user, + "checkUserFn": self._check_posix_user, + } + user.update(posixAttributes) + user.update(base) + return user + + def _check_user(self, user): + """ check if a user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("name"), "%(given-name)s %(surname)s" % user) + self.assertEquals("%s" % found.get("title"), user["job-title"]) + self.assertEquals("%s" % found.get("company"), user["company"]) + self.assertEquals("%s" % found.get("description"), user["description"]) + self.assertEquals("%s" % found.get("department"), user["department"]) + + def _check_posix_user(self, user): + """ check if a posix_user from SamDB has the same attributes as its template """ + found = self._find_user(user["name"]) + + self.assertEquals("%s" % found.get("loginShell"), user["loginShell"]) + self.assertEquals("%s" % found.get("gecos"), user["gecos"]) + self.assertEquals("%s" % found.get("uidNumber"), "%s" % user["uidNumber"]) + self.assertEquals("%s" % found.get("gidNumber"), "%s" % user["gidNumber"]) + self.assertEquals("%s" % found.get("uid"), user["uid"]) + self._check_user(user) + + def _create_user(self, user): + return self.runsubcmd("user", "add", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + def _create_posix_user(self, user): + """ create a new user with RFC2307 attributes """ + return self.runsubcmd("user", "create", user["name"], user["password"], + "--surname=%s" % user["surname"], + "--given-name=%s" % user["given-name"], + "--job-title=%s" % user["job-title"], + "--department=%s" % user["department"], + "--description=%s" % user["description"], + "--company=%s" % user["company"], + "--gecos=%s" % user["gecos"], + "--login-shell=%s" % user["loginShell"], + "--uid=%s" % user["uid"], + "--uid-number=%s" % user["uidNumber"], + "--gid-number=%s" % user["gidNumber"], + "-H", "ldap://%s" % os.environ["DC_SERVER"], + "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"])) + + def _find_user(self, name): + search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn()) + userlist = self.samdb.search(base=self.samdb.domain_dn(), + scope=ldb.SCOPE_SUBTREE, + expression=search_filter, attrs=[]) + if userlist: + return userlist[0] + else: + return None diff --git a/python/samba/tests/samdb.py b/python/samba/tests/samdb.py new file mode 100644 index 00000000000..5c80391cbae --- /dev/null +++ b/python/samba/tests/samdb.py @@ -0,0 +1,96 @@ +# Unix SMB/CIFS implementation. Tests for SamDB +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.samdb.""" + +import logging +import os +import uuid + +from samba.auth import system_session +from samba.provision import (setup_samdb, guess_names, make_smbconf, + provision_paths_from_lp) +from samba.provision import DEFAULT_POLICY_GUID, DEFAULT_DC_POLICY_GUID +from samba.provision.backend import ProvisionBackend +from samba.tests import TestCaseInTempDir +from samba.dcerpc import security +from samba.schema import Schema +from samba import param + + +class SamDBTestCase(TestCaseInTempDir): + """Base-class for tests with a Sam Database. + + This is used by the Samba SamDB-tests, but e.g. also by the OpenChange + provisioning tests (which need a Sam). + """ + + def setUp(self): + super(SamDBTestCase, self).setUp() + invocationid = str(uuid.uuid4()) + domaindn = "DC=COM,DC=EXAMPLE" + self.domaindn = domaindn + configdn = "CN=Configuration," + domaindn + schemadn = "CN=Schema," + configdn + domainguid = str(uuid.uuid4()) + policyguid = DEFAULT_POLICY_GUID + domainsid = security.random_sid() + path = os.path.join(self.tempdir, "samdb.ldb") + session_info = system_session() + + hostname="foo" + domain="EXAMPLE" + dnsdomain="example.com" + serverrole="domain controller" + policyguid_dc = DEFAULT_DC_POLICY_GUID + + smbconf = os.path.join(self.tempdir, "smb.conf") + make_smbconf(smbconf, hostname, domain, dnsdomain, + self.tempdir, serverrole=serverrole) + + self.lp = param.LoadParm() + self.lp.load(smbconf) + + names = guess_names(lp=self.lp, hostname=hostname, + domain=domain, dnsdomain=dnsdomain, + serverrole=serverrole, + domaindn=self.domaindn, configdn=configdn, + schemadn=schemadn) + + paths = provision_paths_from_lp(self.lp, names.dnsdomain) + + logger = logging.getLogger("provision") + + provision_backend = ProvisionBackend("ldb", paths=paths, + lp=self.lp, credentials=None, + names=names, logger=logger) + + schema = Schema(domainsid, invocationid=invocationid, + schemadn=names.schemadn, serverdn=names.serverdn, + am_rodc=False) + + self.samdb = setup_samdb(path, session_info, + provision_backend, self.lp, names, logger, + domainsid, domainguid, policyguid, policyguid_dc, False, + "secret", "secret", "secret", invocationid, "secret", + None, "domain controller", schema=schema) + + def tearDown(self): + for f in ['schema.ldb', 'configuration.ldb', + 'users.ldb', 'samdb.ldb', 'smb.conf']: + os.remove(os.path.join(self.tempdir, f)) + super(SamDBTestCase, self).tearDown() diff --git a/python/samba/tests/security.py b/python/samba/tests/security.py new file mode 100644 index 00000000000..d2938aacb02 --- /dev/null +++ b/python/samba/tests/security.py @@ -0,0 +1,143 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.dcerpc.security.""" + +import samba.tests +from samba.dcerpc import security + +class SecurityTokenTests(samba.tests.TestCase): + + def setUp(self): + super(SecurityTokenTests, self).setUp() + self.token = security.token() + + def test_is_system(self): + self.assertFalse(self.token.is_system()) + + def test_is_anonymous(self): + self.assertFalse(self.token.is_anonymous()) + + def test_has_builtin_administrators(self): + self.assertFalse(self.token.has_builtin_administrators()) + + def test_has_nt_authenticated_users(self): + self.assertFalse(self.token.has_nt_authenticated_users()) + + def test_has_priv(self): + self.assertFalse(self.token.has_privilege(security.SEC_PRIV_SHUTDOWN)) + + def test_set_priv(self): + self.assertFalse(self.token.has_privilege(security.SEC_PRIV_SHUTDOWN)) + self.assertFalse(self.token.set_privilege(security.SEC_PRIV_SHUTDOWN)) + self.assertTrue(self.token.has_privilege(security.SEC_PRIV_SHUTDOWN)) + + +class SecurityDescriptorTests(samba.tests.TestCase): + + def setUp(self): + super(SecurityDescriptorTests, self).setUp() + self.descriptor = security.descriptor() + + def test_from_sddl(self): + desc = security.descriptor.from_sddl("O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)", security.dom_sid("S-2-0-0")) + self.assertEquals(desc.group_sid, security.dom_sid('S-2-0-0-512')) + self.assertEquals(desc.owner_sid, security.dom_sid('S-1-5-32-548')) + self.assertEquals(desc.revision, 1) + self.assertEquals(desc.sacl, None) + self.assertEquals(desc.type, 0x8004) + + def test_from_sddl_invalidsddl(self): + self.assertRaises(TypeError,security.descriptor.from_sddl, "foo",security.dom_sid("S-2-0-0")) + + def test_from_sddl_invalidtype1(self): + self.assertRaises(TypeError, security.descriptor.from_sddl, security.dom_sid('S-2-0-0-512'),security.dom_sid("S-2-0-0")) + + def test_from_sddl_invalidtype2(self): + sddl = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" + self.assertRaises(TypeError, security.descriptor.from_sddl, sddl, + "S-2-0-0") + + def test_as_sddl(self): + text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" + dom = security.dom_sid("S-2-0-0") + desc1 = security.descriptor.from_sddl(text, dom) + desc2 = security.descriptor.from_sddl(desc1.as_sddl(dom), dom) + self.assertEquals(desc1.group_sid, desc2.group_sid) + self.assertEquals(desc1.owner_sid, desc2.owner_sid) + self.assertEquals(desc1.sacl, desc2.sacl) + self.assertEquals(desc1.type, desc2.type) + + def test_as_sddl_invalid(self): + text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" + dom = security.dom_sid("S-2-0-0") + desc1 = security.descriptor.from_sddl(text, dom) + self.assertRaises(TypeError, desc1.as_sddl,text) + + + def test_as_sddl_no_domainsid(self): + dom = security.dom_sid("S-2-0-0") + text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" + desc1 = security.descriptor.from_sddl(text, dom) + desc2 = security.descriptor.from_sddl(desc1.as_sddl(), dom) + self.assertEquals(desc1.group_sid, desc2.group_sid) + self.assertEquals(desc1.owner_sid, desc2.owner_sid) + self.assertEquals(desc1.sacl, desc2.sacl) + self.assertEquals(desc1.type, desc2.type) + + def test_domsid_nodomsid_as_sddl(self): + dom = security.dom_sid("S-2-0-0") + text = "O:AOG:DAD:(A;;RPWPCCDCLCSWRCWDWOGA;;;S-1-0-0)" + desc1 = security.descriptor.from_sddl(text, dom) + self.assertNotEqual(desc1.as_sddl(), desc1.as_sddl(dom)) + + def test_split(self): + dom = security.dom_sid("S-2-0-7") + self.assertEquals((security.dom_sid("S-2-0"), 7), dom.split()) + + +class DomSidTests(samba.tests.TestCase): + + def test_parse_sid(self): + sid = security.dom_sid("S-1-5-21") + self.assertEquals("S-1-5-21", str(sid)) + + def test_sid_equal(self): + sid1 = security.dom_sid("S-1-5-21") + sid2 = security.dom_sid("S-1-5-21") + self.assertEquals(sid1, sid1) + self.assertEquals(sid1, sid2) + + def test_random(self): + sid = security.random_sid() + self.assertTrue(str(sid).startswith("S-1-5-21-")) + + def test_repr(self): + sid = security.random_sid() + self.assertTrue(repr(sid).startswith("dom_sid('S-1-5-21-")) + + +class PrivilegeTests(samba.tests.TestCase): + + def test_privilege_name(self): + self.assertEquals("SeShutdownPrivilege", + security.privilege_name(security.SEC_PRIV_SHUTDOWN)) + + def test_privilege_id(self): + self.assertEquals(security.SEC_PRIV_SHUTDOWN, + security.privilege_id("SeShutdownPrivilege")) + diff --git a/python/samba/tests/source.py b/python/samba/tests/source.py new file mode 100644 index 00000000000..2612ae68cf5 --- /dev/null +++ b/python/samba/tests/source.py @@ -0,0 +1,264 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2011 +# +# Loosely based on bzrlib's test_source.py +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Source level Python tests.""" + +import errno +import os +import re +import warnings + +import samba +samba.ensure_external_module("pep8", "pep8") +import pep8 + +from samba.tests import ( + TestCase, + ) + + +def get_python_source_files(): + """Iterate over all Python source files.""" + library_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "samba")) + assert os.path.isdir(library_dir), library_dir + + for root, dirs, files in os.walk(library_dir): + for f in files: + if f.endswith(".py"): + yield os.path.abspath(os.path.join(root, f)) + + bindir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "bin")) + assert os.path.isdir(bindir), bindir + for f in os.listdir(bindir): + p = os.path.abspath(os.path.join(bindir, f)) + if not os.path.islink(p): + continue + target = os.readlink(p) + if os.path.dirname(target).endswith("scripting/bin"): + yield p + wafsambadir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", "..", "buildtools", "wafsamba")) + assert os.path.isdir(wafsambadir), wafsambadir + for root, dirs, files in os.walk(wafsambadir): + for f in files: + if f.endswith(".py"): + yield os.path.abspath(os.path.join(root, f)) + + +def get_source_file_contents(): + """Iterate over the contents of all python files.""" + for fname in get_python_source_files(): + try: + f = open(fname, 'rb') + except IOError, e: + if e.errno == errno.ENOENT: + warnings.warn("source file %s broken link?" % fname) + continue + else: + raise + try: + text = f.read() + finally: + f.close() + yield fname, text + + +class TestSource(TestCase): + + def test_copyright(self): + """Test that all Python files have a valid copyright statement.""" + incorrect = [] + + copyright_re = re.compile('#\\s*copyright.*(?=\n)', re.I) + + for fname, text in get_source_file_contents(): + if fname.endswith("ms_schema.py"): + # FIXME: Not sure who holds copyright on ms_schema.py + continue + if "wafsamba" in fname: + # FIXME: No copyright headers in wafsamba + continue + match = copyright_re.search(text) + if not match: + incorrect.append((fname, 'no copyright line found\n')) + + if incorrect: + help_text = ["Some files have missing or incorrect copyright" + " statements.", + "", + ] + for fname, comment in incorrect: + help_text.append(fname) + help_text.append((' ' * 4) + comment) + + self.fail('\n'.join(help_text)) + + def test_gpl(self): + """Test that all .py files have a GPL disclaimer.""" + incorrect = [] + + gpl_txt = """ +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" + gpl_re = re.compile(re.escape(gpl_txt), re.MULTILINE) + + for fname, text in get_source_file_contents(): + if "wafsamba" in fname: + # FIXME: License to wafsamba hasn't been clarified yet + continue + if not gpl_re.search(text): + incorrect.append(fname) + + if incorrect: + help_text = ['Some files have missing or incomplete GPL statement', + gpl_txt] + for fname in incorrect: + help_text.append((' ' * 4) + fname) + + self.fail('\n'.join(help_text)) + + def _push_file(self, dict_, fname, line_no): + if fname not in dict_: + dict_[fname] = [line_no] + else: + dict_[fname].append(line_no) + + def _format_message(self, dict_, message): + files = ["%s: %s" % (f, ', '.join([str(i + 1) for i in lines])) + for f, lines in dict_.items()] + files.sort() + return message + '\n\n %s' % ('\n '.join(files)) + + def _iter_source_files_lines(self): + for fname, text in get_source_file_contents(): + lines = text.splitlines(True) + last_line_no = len(lines) - 1 + for line_no, line in enumerate(lines): + yield fname, line_no, line + + def test_no_tabs(self): + """Check that there are no tabs in Python files.""" + tabs = {} + for fname, line_no, line in self._iter_source_files_lines(): + if '\t' in line: + self._push_file(tabs, fname, line_no) + if tabs: + self.fail(self._format_message(tabs, + 'Tab characters were found in the following source files.' + '\nThey should either be replaced by "\\t" or by spaces:')) + + def test_unix_newlines(self): + """Check for unix new lines.""" + illegal_newlines = {} + for fname, line_no, line in self._iter_source_files_lines(): + if not line.endswith('\n') or line.endswith('\r\n'): + self._push_file(illegal_newlines, fname, line_no) + if illegal_newlines: + self.fail(self._format_message(illegal_newlines, + 'Non-unix newlines were found in the following source files:')) + + def test_trailing_whitespace(self): + """Check that there is not trailing whitespace in Python files.""" + trailing_whitespace = {} + for fname, line_no, line in self._iter_source_files_lines(): + if line.rstrip("\n").endswith(" "): + self._push_file(trailing_whitespace, fname, line_no) + if trailing_whitespace: + self.fail(self._format_message(trailing_whitespace, + 'Trailing whitespace was found in the following source files.')) + + def test_shebang_lines(self): + """Check that files with shebang lines and only those are executable.""" + files_with_shebang = {} + files_without_shebang= {} + for fname, line_no, line in self._iter_source_files_lines(): + if line_no >= 1: + continue + executable = (os.stat(fname).st_mode & 0111) + has_shebang = line.startswith("#!") + if has_shebang and not executable: + self._push_file(files_with_shebang, fname, line_no) + if not has_shebang and executable: + self._push_file(files_without_shebang, fname, line_no) + if files_with_shebang: + self.fail(self._format_message(files_with_shebang, + 'Files with shebang line that are not executable:')) + if files_without_shebang: + self.fail(self._format_message(files_without_shebang, + 'Files without shebang line that are executable:')) + + pep8_ignore = [ + 'E401', # multiple imports on one line + 'E501', # line too long + 'E251', # no spaces around keyword / parameter equals + 'E201', # whitespace after '[' + 'E202', # whitespace before ')' + 'E302', # expected 2 blank lines, found 1 + 'E231', # missing whitespace after ',' + 'E225', # missing whitespace around operator + 'E111', # indentation is not a multiple of four + 'E261', # at least two spaces before inline comment + 'E702', # multiple statements on one line (semicolon) + 'E221', # multiple spaces before operator + 'E303', # too many blank lines (2) + 'E203', # whitespace before ':' + 'E222', # multiple spaces after operator + 'E301', # expected 1 blank line, found 0 + 'E211', # whitespace before '(' + 'E701', # multiple statements on one line (colon) + ] + + def test_pep8(self): + pep8.process_options() + pep8.options.repeat = True + pep8_errors = [] + pep8_warnings = [] + for fname, text in get_source_file_contents(): + def report_error(line_number, offset, text, check): + code = text[:4] + if code in self.pep8_ignore: + code = 'W' + code[1:] + text = code + text[4:] + print "%s:%s: %s" % (fname, line_number, text) + summary = (fname, line_number, offset, text, check) + if code[0] == 'W': + pep8_warnings.append(summary) + else: + pep8_errors.append(summary) + lines = text.splitlines(True) + checker = pep8.Checker(fname, lines) + checker.report_error = report_error + checker.check_all() + if len(pep8_errors) > 0: + d = {} + for (fname, line_no, offset, text, check) in pep8_errors: + d.setdefault(fname, []).append(line_no - 1) + self.fail(self._format_message(d, + 'There were %d PEP8 errors:' % len(pep8_errors))) + diff --git a/python/samba/tests/strings.py b/python/samba/tests/strings.py new file mode 100644 index 00000000000..23382d756ec --- /dev/null +++ b/python/samba/tests/strings.py @@ -0,0 +1,103 @@ +# subunit test cases for Samba string functions. + +# Copyright (C) 2003 by Martin Pool <mbp@samba.org> +# Copyright (C) 2011 Andrew Bartlett +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# XXX: All this code assumes that the Unix character set is UTF-8, +# which is the most common setting. I guess it would be better to +# force it to that value while running the tests. I'm not sure of the +# best way to do that yet. +# +# -- mbp + +from unicodenames import * + +import samba.tests +from samba import strcasecmp_m, strstr_m + +def signum(a): + if a < 0: + return -1 + elif a > 0: + return +1 + else: + return 0 + + +class strcasecmp_m_Tests(samba.tests.TestCase): + """String comparisons in simple ASCII and unicode""" + def test_strcasecmp_m(self): + # A, B, strcasecmp(A, B) + cases = [('hello', 'hello', 0), + ('hello', 'goodbye', +1), + ('goodbye', 'hello', -1), + ('hell', 'hello', -1), + ('', '', 0), + ('a', '', +1), + ('', 'a', -1), + ('a', 'A', 0), + ('aa', 'aA', 0), + ('Aa', 'aa', 0), + ('longstring ' * 100, 'longstring ' * 100, 0), + ('longstring ' * 100, 'longstring ' * 100 + 'a', -1), + ('longstring ' * 100 + 'a', 'longstring ' * 100, +1), + (KATAKANA_LETTER_A, KATAKANA_LETTER_A, 0), + (KATAKANA_LETTER_A, 'a', 1), + ] + for a, b, expect in cases: + self.assertEquals(signum(strcasecmp_m(a.encode('utf-8'), + b.encode('utf-8'))), + expect) + +class strstr_m_Tests(samba.tests.TestCase): + """strstr_m tests in simple ASCII and unicode strings""" + + def test_strstr_m(self): + # A, B, strstr_m(A, B) + cases = [('hello', 'hello', 'hello'), + ('hello', 'goodbye', None), + ('goodbye', 'hello', None), + ('hell', 'hello', None), + ('hello', 'hell', 'hello'), + ('', '', ''), + ('a', '', 'a'), + ('', 'a', None), + ('a', 'A', None), + ('aa', 'aA', None), + ('Aa', 'aa', None), + ('%v foo', '%v', '%v foo'), + ('foo %v foo', '%v', '%v foo'), + ('foo %v', '%v', '%v'), + ('longstring ' * 100, 'longstring ' * 99, 'longstring ' * 100), + ('longstring ' * 99, 'longstring ' * 100, None), + ('longstring a' * 99, 'longstring ' * 100 + 'a', None), + ('longstring ' * 100 + 'a', 'longstring ' * 100, 'longstring ' * 100 + 'a'), + (KATAKANA_LETTER_A, KATAKANA_LETTER_A + 'bcd', None), + (KATAKANA_LETTER_A + 'bcde', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcde'), + ('d'+KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcd'), + ('d'+KATAKANA_LETTER_A + 'bd', KATAKANA_LETTER_A + 'bcd', None), + + ('e'+KATAKANA_LETTER_A + 'bcdf', KATAKANA_LETTER_A + 'bcd', KATAKANA_LETTER_A + 'bcdf'), + (KATAKANA_LETTER_A, KATAKANA_LETTER_A + 'bcd', None), + (KATAKANA_LETTER_A*3, 'a', None), + ] + for a, b, expect in cases: + if expect is not None: + expect = expect.encode('utf-8') + self.assertEquals(strstr_m(a.encode('utf-8'), + b.encode('utf-8')), + expect) diff --git a/python/samba/tests/unicodenames.py b/python/samba/tests/unicodenames.py new file mode 100644 index 00000000000..ed65de6651f --- /dev/null +++ b/python/samba/tests/unicodenames.py @@ -0,0 +1,29 @@ +# Copyright (C) 2003 by Martin Pool <mbp@samba.org> + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +""" +Defines symbolic names for a few UNICODE characters, to make test +source code more readable on machines that don't have all the +necessary fonts. + +You can do "import *" on this file safely. +""" + +LATIN_CAPITAL_LETTER_N_WITH_TILDE = u'\u004e' +LATIN_CAPITAL_LETTER_O_WITH_DIARESIS = u'\u00d6' +LATIN_SMALL_LETTER_O_WITH_DIARESIS = u'\u00f6' + +KATAKANA_LETTER_A = u'\u30a2' diff --git a/python/samba/tests/upgrade.py b/python/samba/tests/upgrade.py new file mode 100644 index 00000000000..b46a4173191 --- /dev/null +++ b/python/samba/tests/upgrade.py @@ -0,0 +1,40 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.upgrade.""" + +from samba.upgrade import import_wins +from samba.tests import LdbTestCase + + +class WinsUpgradeTests(LdbTestCase): + + def test_upgrade(self): + winsdb = { + "FOO#20": (200, ["127.0.0.1", "127.0.0.2"], 0x60) + } + import_wins(self.ldb, winsdb) + + self.assertEquals( + ['name=FOO,type=0x20'], + [str(m.dn) for m in + self.ldb.search(expression="(objectClass=winsRecord)")]) + + def test_version(self): + import_wins(self.ldb, {}) + self.assertEquals("VERSION", + str(self.ldb.search(expression="(objectClass=winsMaxVersion)")[0]["cn"])) diff --git a/python/samba/tests/upgradeprovision.py b/python/samba/tests/upgradeprovision.py new file mode 100644 index 00000000000..93a6731c830 --- /dev/null +++ b/python/samba/tests/upgradeprovision.py @@ -0,0 +1,135 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.upgradeprovision.""" + +import os +from samba.upgradehelpers import (usn_in_range, dn_sort, + get_diff_sddls, update_secrets, + construct_existor_expr) + +from samba.tests.provision import create_dummy_secretsdb +from samba.tests import TestCaseInTempDir +from samba import Ldb +from ldb import SCOPE_BASE +import samba.tests + +def dummymessage(a=None, b=None): + pass + + +class UpgradeProvisionTestCase(TestCaseInTempDir): + """Some simple tests for individual functions in the provisioning code. + """ + def test_usn_in_range(self): + range = [5, 25, 35, 55] + + vals = [3, 26, 56] + + for v in vals: + self.assertFalse(usn_in_range(v, range)) + + vals = [5, 20, 25, 35, 36] + + for v in vals: + self.assertTrue(usn_in_range(v, range)) + + def test_dn_sort(self): + # higher level comes after lower even if lexicographicaly closer + # ie dc=tata,dc=toto (2 levels), comes after dc=toto + # even if dc=toto is lexicographicaly after dc=tata, dc=toto + self.assertEquals(dn_sort("dc=tata,dc=toto", "dc=toto"), 1) + self.assertEquals(dn_sort("dc=zata", "dc=tata"), 1) + self.assertEquals(dn_sort("dc=toto,dc=tata", + "cn=foo,dc=toto,dc=tata"), -1) + self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata", + "cn=foo, dc=toto,dc=tata"), -1) + + def test_get_diff_sddl(self): + sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" + sddl1 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" + sddl2 = "O:BAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" + sddl3 = "O:SAG:BAD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" + sddl4 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)" + sddl5 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\ +(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)" + + self.assertEquals(get_diff_sddls(sddl, sddl1), "") + txt = get_diff_sddls(sddl, sddl2) + self.assertEquals(txt, "\tOwner mismatch: SA (in ref) BA(in current)\n") + txt = get_diff_sddls(sddl, sddl3) + self.assertEquals(txt, "\tGroup mismatch: DU (in ref) BA(in current)\n") + txt = get_diff_sddls(sddl, sddl4) + txtmsg = "\tPart dacl is different between reference and current here\ + is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\ + the reference\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA) ACE is not present in\ + the current\n" + self.assertEquals(txt, txtmsg) + txt = get_diff_sddls(sddl, sddl5) + self.assertEquals(txt, "\tCurrent ACL hasn't a sacl part\n") + + def test_construct_existor_expr(self): + res = construct_existor_expr([]) + self.assertEquals(res, "") + + res = construct_existor_expr(["foo"]) + self.assertEquals(res, "(|(foo=*))") + + res = construct_existor_expr(["foo", "bar"]) + self.assertEquals(res, "(|(foo=*)(bar=*))") + + +class UpdateSecretsTests(samba.tests.TestCaseInTempDir): + + def setUp(self): + super(UpdateSecretsTests, self).setUp() + self.referencedb = create_dummy_secretsdb( + os.path.join(self.tempdir, "ref.ldb")) + + def _getEmptyDb(self): + return Ldb(os.path.join(self.tempdir, "secrets.ldb")) + + def _getCurrentFormatDb(self): + return create_dummy_secretsdb( + os.path.join(self.tempdir, "secrets.ldb")) + + def test_trivial(self): + # Test that updating an already up-to-date secretsdb works fine + self.secretsdb = self._getCurrentFormatDb() + self.assertEquals(None, + update_secrets(self.referencedb, self.secretsdb, dummymessage)) + + def test_update_modules(self): + empty_db = self._getEmptyDb() + update_secrets(self.referencedb, empty_db, dummymessage) + newmodules = empty_db.search(base="@MODULES", scope=SCOPE_BASE) + refmodules = self.referencedb.search(base="@MODULES", scope=SCOPE_BASE) + self.assertEquals(newmodules.msgs, refmodules.msgs) + + def tearDown(self): + for name in ["ref.ldb", "secrets.ldb", "secrets.tdb", "secrets.tdb.bak", "secrets.ntdb"]: + path = os.path.join(self.tempdir, name) + if os.path.exists(path): + os.unlink(path) + super(UpdateSecretsTests, self).tearDown() + + diff --git a/python/samba/tests/upgradeprovisionneeddc.py b/python/samba/tests/upgradeprovisionneeddc.py new file mode 100644 index 00000000000..a7cb298ed7f --- /dev/null +++ b/python/samba/tests/upgradeprovisionneeddc.py @@ -0,0 +1,179 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2007-2008 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.upgradeprovision that need a DC.""" + +import os +import re +import shutil + +from samba import param +from samba.credentials import Credentials +from samba.auth import system_session +from samba.provision import getpolicypath,find_provision_key_parameters +from samba.upgradehelpers import (get_paths, get_ldbs, + identic_rename, + updateOEMInfo, getOEMInfo, update_gpo, + delta_update_basesamdb, + update_dns_account_password, + search_constructed_attrs_stored, + increment_calculated_keyversion_number) +from samba.tests import env_loadparm, TestCaseInTempDir +from samba.tests.provision import create_dummy_secretsdb +import ldb + + +def dummymessage(a=None, b=None): + pass + +smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf") + +class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir): + """Some simple tests for individual functions in the provisioning code. + """ + + def test_get_ldbs(self): + paths = get_paths(param, None, smb_conf_path) + creds = Credentials() + lp = env_loadparm() + creds.guess(lp) + get_ldbs(paths, creds, system_session(), lp) + + def test_find_key_param(self): + paths = get_paths(param, None, smb_conf_path) + creds = Credentials() + lp = env_loadparm() + creds.guess(lp) + rootdn = "dc=samba,dc=example,dc=com" + ldbs = get_ldbs(paths, creds, system_session(), lp) + names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap, + paths, smb_conf_path, lp) + self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM") + self.assertEquals(str(names.rootdn).lower(), rootdn.lower()) + self.assertNotEquals(names.policyid_dc, None) + self.assertNotEquals(names.ntdsguid, "") + + +class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir): + + def _getEmptyDbName(self): + return os.path.join(self.tempdir, "sam.ldb") + + def setUp(self): + super(UpgradeProvisionWithLdbTestCase, self).setUp() + paths = get_paths(param, None, smb_conf_path) + self.creds = Credentials() + self.lp = env_loadparm() + self.creds.guess(self.lp) + self.paths = paths + self.ldbs = get_ldbs(paths, self.creds, system_session(), self.lp) + self.names = find_provision_key_parameters(self.ldbs.sam, + self.ldbs.secrets, self.ldbs.idmap, paths, smb_conf_path, + self.lp) + self.referencedb = create_dummy_secretsdb( + os.path.join(self.tempdir, "ref.ldb")) + + def test_search_constructed_attrs_stored(self): + hashAtt = search_constructed_attrs_stored(self.ldbs.sam, + self.names.rootdn, + ["msds-KeyVersionNumber"]) + self.assertFalse(hashAtt.has_key("msds-KeyVersionNumber")) + + def test_increment_calculated_keyversion_number(self): + dn = "CN=Administrator,CN=Users,%s" % self.names.rootdn + # We conctruct a simple hash for the user administrator + hash = {} + # And we want the version to be 140 + hash[dn.lower()] = 140 + + increment_calculated_keyversion_number(self.ldbs.sam, + self.names.rootdn, + hash) + self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn, + "unicodePwd"), + 140) + # This function should not decrement the version + hash[dn.lower()] = 130 + + increment_calculated_keyversion_number(self.ldbs.sam, + self.names.rootdn, + hash) + self.assertEqual(self.ldbs.sam.get_attribute_replmetadata_version(dn, + "unicodePwd"), + 140) + + def test_identic_rename(self): + rootdn = "DC=samba,DC=example,DC=com" + + guestDN = ldb.Dn(self.ldbs.sam, "CN=Guest,CN=Users,%s" % rootdn) + identic_rename(self.ldbs.sam, guestDN) + res = self.ldbs.sam.search(expression="(name=Guest)", base=rootdn, + scope=ldb.SCOPE_SUBTREE, attrs=["dn"]) + self.assertEquals(len(res), 1) + self.assertEquals(str(res[0]["dn"]), "CN=Guest,CN=Users,%s" % rootdn) + + def test_delta_update_basesamdb(self): + dummysampath = self._getEmptyDbName() + delta_update_basesamdb(self.paths.samdb, dummysampath, + self.creds, system_session(), self.lp, + dummymessage) + + def test_update_gpo_simple(self): + dir = getpolicypath(self.paths.sysvol, self.names.dnsdomain, + self.names.policyid) + shutil.rmtree(dir) + self.assertFalse(os.path.isdir(dir)) + update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage) + self.assertTrue(os.path.isdir(dir)) + + def test_update_gpo_acl(self): + path = os.path.join(self.tempdir, "testupdategpo") + save = self.paths.sysvol + self.paths.sysvol = path + os.mkdir(path) + os.mkdir(os.path.join(path, self.names.dnsdomain)) + os.mkdir(os.path.join(os.path.join(path, self.names.dnsdomain), + "Policies")) + update_gpo(self.paths, self.ldbs.sam, self.names, self.lp, dummymessage) + shutil.rmtree(path) + self.paths.sysvol = save + + def test_getOEMInfo(self): + realm = self.lp.get("realm") + basedn = "DC=%s" % realm.replace(".", ", DC=") + oem = getOEMInfo(self.ldbs.sam, basedn) + self.assertNotEquals(oem, "") + + def test_update_dns_account(self): + update_dns_account_password(self.ldbs.sam, self.ldbs.secrets, + self.names) + + def test_updateOEMInfo(self): + realm = self.lp.get("realm") + basedn = "DC=%s" % realm.replace(".", ", DC=") + oem = getOEMInfo(self.ldbs.sam, basedn) + updateOEMInfo(self.ldbs.sam, basedn) + oem2 = getOEMInfo(self.ldbs.sam, basedn) + self.assertNotEquals(str(oem), str(oem2)) + self.assertTrue(re.match(".*upgrade to.*", str(oem2))) + + def tearDown(self): + for name in ["ref.ldb", "secrets.ldb", "secrets.tdb", "secrets.tdb.bak", "secrets.ntdb", "sam.ldb"]: + path = os.path.join(self.tempdir, name) + if os.path.exists(path): + os.unlink(path) + super(UpgradeProvisionWithLdbTestCase, self).tearDown() diff --git a/python/samba/tests/xattr.py b/python/samba/tests/xattr.py new file mode 100644 index 00000000000..89add284566 --- /dev/null +++ b/python/samba/tests/xattr.py @@ -0,0 +1,126 @@ +# Unix SMB/CIFS implementation. Tests for xattr manipulation +# Copyright (C) Matthieu Patou <mat@matws.net> 2009 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for samba.xattr_native and samba.xattr_tdb.""" + +import samba.xattr_native, samba.xattr_tdb +from samba.xattr import copytree_with_xattrs +from samba.dcerpc import xattr +from samba.ndr import ndr_pack +from samba.tests import ( + TestCase, + TestCaseInTempDir, + TestSkipped, + ) +import random +import shutil +import os + +class XattrTests(TestCase): + + def _tmpfilename(self): + random.seed() + path = os.environ['SELFTEST_PREFIX'] + return os.path.join(path, "pytests"+str(int(100000*random.random()))) + + def _eadbpath(self): + return os.path.join(os.environ['SELFTEST_PREFIX'], "eadb.tdb") + + def test_set_xattr_native(self): + if not samba.xattr_native.is_xattr_supported(): + raise TestSkipped() + ntacl = xattr.NTACL() + ntacl.version = 1 + tempf = self._tmpfilename() + open(tempf, 'w').write("empty") + try: + samba.xattr_native.wrap_setxattr(tempf, "user.unittests", + ndr_pack(ntacl)) + except IOError: + raise TestSkipped("the filesystem where the tests are runned do not support XATTR") + os.unlink(tempf) + + def test_set_and_get_native(self): + if not samba.xattr_native.is_xattr_supported(): + raise TestSkipped() + tempf = self._tmpfilename() + reftxt = "this is a test" + open(tempf, 'w').write("empty") + try: + samba.xattr_native.wrap_setxattr(tempf, "user.unittests", reftxt) + text = samba.xattr_native.wrap_getxattr(tempf, "user.unittests") + self.assertEquals(text, reftxt) + except IOError: + raise TestSkipped("the filesystem where the tests are runned do not support XATTR") + os.unlink(tempf) + + def test_set_xattr_tdb(self): + tempf = self._tmpfilename() + eadb_path = self._eadbpath() + ntacl = xattr.NTACL() + ntacl.version = 1 + open(tempf, 'w').write("empty") + try: + samba.xattr_tdb.wrap_setxattr(eadb_path, + tempf, "user.unittests", ndr_pack(ntacl)) + finally: + os.unlink(tempf) + os.unlink(eadb_path) + + def test_set_tdb_not_open(self): + tempf = self._tmpfilename() + ntacl = xattr.NTACL() + ntacl.version = 1 + open(tempf, 'w').write("empty") + try: + self.assertRaises(IOError, samba.xattr_tdb.wrap_setxattr, + os.path.join("nonexistent", "eadb.tdb"), tempf, + "user.unittests", ndr_pack(ntacl)) + finally: + os.unlink(tempf) + + def test_set_and_get_tdb(self): + tempf = self._tmpfilename() + eadb_path = self._eadbpath() + reftxt = "this is a test" + open(tempf, 'w').write("empty") + try: + samba.xattr_tdb.wrap_setxattr(eadb_path, tempf, "user.unittests", + reftxt) + text = samba.xattr_tdb.wrap_getxattr(eadb_path, tempf, + "user.unittests") + self.assertEquals(text, reftxt) + finally: + os.unlink(tempf) + os.unlink(eadb_path) + + +class TestCopyTreeWithXattrs(TestCaseInTempDir): + + def test_simple(self): + os.chdir(self.tempdir) + os.mkdir("a") + os.mkdir("a/b") + os.mkdir("a/b/c") + f = open('a/b/c/d', 'w') + try: + f.write("foo") + finally: + f.close() + copytree_with_xattrs("a", "b") + shutil.rmtree("a") + shutil.rmtree("b") |