summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/util/tests/test_s4_logging.c197
-rw-r--r--lib/util/wscript_build8
-rw-r--r--python/samba/tests/logfiles.py373
-rw-r--r--selftest/tests.py1
4 files changed, 579 insertions, 0 deletions
diff --git a/lib/util/tests/test_s4_logging.c b/lib/util/tests/test_s4_logging.c
new file mode 100644
index 00000000000..8e2b09deba8
--- /dev/null
+++ b/lib/util/tests/test_s4_logging.c
@@ -0,0 +1,197 @@
+/*
+ Unix SMB/CIFS implementation.
+
+ A test server that only does logging.
+
+ Copyright (C) Andrew Tridgell 1992-2005
+ Copyright (C) Martin Pool 2002
+ Copyright (C) Jelmer Vernooij 2002
+ Copyright (C) James J Myers 2003 <myersjj@samba.org>
+ Copyright (C) Douglas Bagnall 2022
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/cmdline/cmdline.h"
+
+#define BINARY_NAME "test_s4_logging"
+
+static int log_level = 1;
+
+
+struct debug_class {
+ int dclass;
+ const char *name;
+};
+
+struct debug_class classes_table[] = {
+ {DBGC_ALL, "all"},
+ {DBGC_TDB, "tdb"},
+ {DBGC_PRINTDRIVERS, "printdrivers"},
+ {DBGC_LANMAN, "lanman"},
+ {DBGC_SMB, "smb"},
+ {DBGC_RPC_PARSE, "rpc_parse"},
+ {DBGC_RPC_SRV, "rpc_srv"},
+ {DBGC_RPC_CLI, "rpc_cli"},
+ {DBGC_PASSDB, "passdb"},
+ {DBGC_SAM, "sam"},
+ {DBGC_AUTH, "auth"},
+ {DBGC_WINBIND, "winbind"},
+ {DBGC_VFS, "vfs"},
+ {DBGC_IDMAP, "idmap"},
+ {DBGC_QUOTA, "quota"},
+ {DBGC_ACLS, "acls"},
+ {DBGC_LOCKING, "locking"},
+ {DBGC_MSDFS, "msdfs"},
+ {DBGC_DMAPI, "dmapi"},
+ {DBGC_REGISTRY, "registry"},
+ {DBGC_SCAVENGER, "scavenger"},
+ {DBGC_DNS, "dns"},
+ {DBGC_LDB, "ldb"},
+ {DBGC_TEVENT, "tevent"},
+ {DBGC_AUTH_AUDIT, "auth_audit"},
+ {DBGC_AUTH_AUDIT_JSON, "auth_json_audit"},
+ {DBGC_KERBEROS, "kerberos"},
+ {DBGC_DRS_REPL, "drs_repl"},
+ {DBGC_SMB2, "smb2"},
+ {DBGC_SMB2_CREDITS, "smb2_credits"},
+ {DBGC_DSDB_AUDIT, "dsdb_audit"},
+ {DBGC_DSDB_AUDIT_JSON, "dsdb_json_audit"},
+ {DBGC_DSDB_PWD_AUDIT, "dsdb_password_audit"},
+ {DBGC_DSDB_PWD_AUDIT_JSON, "dsdb_password_json_audit"},
+ {DBGC_DSDB_TXN_AUDIT, "dsdb_transaction_audit"},
+ {DBGC_DSDB_TXN_AUDIT_JSON, "dsdb_transaction_json_audit"},
+ {DBGC_DSDB_GROUP_AUDIT, "dsdb_group_audit"},
+ {DBGC_DSDB_GROUP_AUDIT_JSON, "dsdb_group_json_audit"},
+};
+
+
+static int log_all_classes(int level)
+{
+ size_t i;
+ struct debug_class c;
+ for (i = 0; i < ARRAY_SIZE(classes_table); i++) {
+ c = classes_table[i];
+ if (i != c.dclass) {
+ /*
+ * we implicitly rely on these values staying in the
+ * right order.
+ */
+ fprintf(stderr,
+ "expected '%s' class to have value %zu\n",
+ c.name, i);
+ return 1;
+ }
+
+ DEBUGC(c.dclass, level,
+ ("logging for '%s' [%d], at level %d\n",
+ c.name, c.dclass, level));
+
+ /*
+ * That's it for the tests *here*. The invoker of this
+ * process will have set up an smb.conf that directs the
+ * output in particular ways, and will be looking to see that
+ * happens correctly.
+ */
+ }
+ return 0;
+}
+
+
+static int init_daemon(TALLOC_CTX *mem_ctx,
+ int argc,
+ const char *argv[],
+ const char **error)
+{
+ poptContext pc;
+ int opt;
+ bool ok;
+ struct poptOption long_options[] = {
+ POPT_AUTOHELP
+ {
+ .longName = "level",
+ .shortName = 'L',
+ .argInfo = POPT_ARG_INT,
+ .arg = &log_level,
+ .descrip = "log at this level",
+ .argDescrip = "LEVEL",
+ },
+ POPT_COMMON_SAMBA
+ POPT_COMMON_DAEMON
+ POPT_COMMON_VERSION
+ POPT_TABLEEND
+ };
+
+ setproctitle(BINARY_NAME);
+
+ ok = samba_cmdline_init(mem_ctx,
+ SAMBA_CMDLINE_CONFIG_SERVER,
+ true /* require_smbconf */);
+ if (!ok) {
+ *error = "Failed to init cmdline parser!\n";
+ return EINVAL;
+ }
+
+ pc = samba_popt_get_context(BINARY_NAME,
+ argc,
+ argv,
+ long_options,
+ 0);
+ if (pc == NULL) {
+ *error = "Failed to setup popt context!\n";
+ return ENOTRECOVERABLE;
+ }
+
+ while((opt = poptGetNextOpt(pc)) != -1) {
+ fprintf(stderr, "\nInvalid option %s: %s\n\n",
+ poptBadOption(pc, 0), poptStrerror(opt));
+ poptPrintUsage(pc, stderr, 0);
+ return 1;
+ }
+
+ poptFreeContext(pc);
+
+ return 0;
+}
+
+
+int main(int argc, const char *argv[])
+{
+ TALLOC_CTX *mem_ctx = NULL;
+ int rc;
+ const char *error = NULL;
+
+ mem_ctx = talloc_init("crazy-logging-test-server.c#main");
+ if (mem_ctx == NULL) {
+ exit(ENOMEM);
+ }
+
+ setproctitle_init(argc, discard_const(argv), environ);
+
+ rc = init_daemon(mem_ctx, argc, argv, &error);
+ if (rc != 0) {
+ fprintf(stderr, "error [%d]: %s\n", rc, error);
+ exit_daemon(error, rc);
+ }
+
+ rc = log_all_classes(log_level);
+ if (rc != 0) {
+ fprintf(stderr, "error in log_all_classes [%d]\n", rc);
+ exit_daemon("logging error", rc);
+ }
+
+ TALLOC_FREE(mem_ctx);
+ return rc;
+}
diff --git a/lib/util/wscript_build b/lib/util/wscript_build
index e3fa3295b46..334057c98a7 100644
--- a/lib/util/wscript_build
+++ b/lib/util/wscript_build
@@ -367,3 +367,11 @@ else:
deps='cmocka replace samba-util',
local_include=False,
for_selftest=True)
+
+ bld.SAMBA_BINARY('test_s4_logging',
+ source='tests/test_s4_logging.c',
+ deps=' '.join(['CMDLINE_S4',
+ 'samba-util',
+ 'popt']),
+ local_include=False,
+ for_selftest=True)
diff --git a/python/samba/tests/logfiles.py b/python/samba/tests/logfiles.py
new file mode 100644
index 00000000000..ac3487271b6
--- /dev/null
+++ b/python/samba/tests/logfiles.py
@@ -0,0 +1,373 @@
+# Unix SMB/CIFS implementation.
+#
+# Copyright (C) Catalyst.Net Ltd. 2022
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+import subprocess
+import os
+from samba.tests import TestCaseInTempDir
+from pprint import pprint
+
+HERE = os.path.dirname(__file__)
+SERVER = os.path.join(HERE, '../../../../bin/test_s4_logging')
+
+CLASS_LIST = ["all", "tdb", "printdrivers", "lanman", "smb",
+ "rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
+ "winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
+ "dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
+ "auth_audit", "auth_json_audit", "kerberos", "drs_repl",
+ "smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
+ "dsdb_password_audit", "dsdb_password_json_audit",
+ "dsdb_transaction_audit", "dsdb_transaction_json_audit",
+ "dsdb_group_audit", "dsdb_group_json_audit"]
+
+
+CLASS_CODES = {k: i for i, k in enumerate(CLASS_LIST)}
+
+
+class S4LoggingTests(TestCaseInTempDir):
+
+ def _write_smb_conf(self,
+ default_level=2,
+ default_file="default",
+ mapping=()):
+ self.smbconf = os.path.join(self.tempdir, "smb.conf")
+
+ with open(self.smbconf, "w") as f:
+ f.write('[global]\n')
+ if default_file is not None:
+ dest = os.path.join(self.tempdir,
+ default_file)
+ f.write(f" log file = {dest}\n")
+
+ f.write(" log level = ")
+ if default_level:
+ f.write(f"{default_level}")
+
+ for dbg_class, log_level, log_file in mapping:
+ f.write(' ')
+ f.write(dbg_class)
+ if log_level is not None:
+ f.write(f':{log_level}')
+ if log_file is not None:
+ dest = os.path.join(self.tempdir,
+ log_file)
+
+ f.write(f'@{dest}')
+ f.write('\n')
+ self.addCleanup(os.unlink, self.smbconf)
+
+ def _extract_log_level_line(self, new_level=2):
+ # extricate the 'log level' line from the smb.conf, returning
+ # the value, and replacing the log level line with something
+ # innocuous.
+ smbconf2 = self.smbconf + 'new'
+ with open(self.smbconf) as f:
+ with open(smbconf2, 'w') as f2:
+ for line in f:
+ if 'log level' in line:
+ debug_arg = line.split('=', 1)[1].strip()
+ if new_level is not None:
+ f2.write(f' log level = {new_level}\n')
+ else:
+ f2.write(line)
+ os.replace(smbconf2, self.smbconf)
+ return debug_arg
+
+ def _get_expected_strings(self, mapping,
+ level_filter,
+ default_file='default',
+ file_filter=None):
+ default = os.path.join(self.tempdir, default_file)
+ expected = {default: []}
+ # this kind of thing:
+ # " logging for 'dns' [21], at level 4"
+ for dbg_class, log_level, log_file in mapping:
+ if log_file is None:
+ log_file = default_file
+
+ f = os.path.join(self.tempdir, log_file)
+ expected.setdefault(f, [])
+ if log_level < level_filter:
+ continue
+ if file_filter not in (None, log_file):
+ continue
+ s = (f" logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
+ f"at level {level_filter}")
+ expected[f].append(s)
+
+ return expected
+
+ def _run_s4_logger(self, log_level, *extra_args):
+ cmd = [SERVER,
+ '-s', self.smbconf,
+ '-L', str(log_level),
+ *extra_args]
+
+ p = subprocess.run(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.assertEqual(p.returncode, 0,
+ f"'{' '.join(cmd)}' failed ({p.returncode})")
+
+ return p.stdout.decode(), p.stderr.decode()
+
+ def assert_string_contains(self, string, expected_lines,
+ filename=None):
+ expected_lines = set(expected_lines)
+ string_lines = set(string.split('\n'))
+ present_lines = string_lines & expected_lines
+ if present_lines != expected_lines:
+ if filename:
+ print(filename)
+ print("expected %d lines, found %d" %
+ (len(expected_lines), len(present_lines)))
+ print("missing lines:")
+ pprint(expected_lines - present_lines)
+ raise AssertionError("missing lines")
+
+ def assert_file_contains(self, filename, expected_lines):
+ with open(filename) as f:
+ string = f.read()
+ self.assert_string_contains(string, expected_lines, filename)
+
+ def assert_n_known_lines_string(self, string, n):
+ count = string.count("logging for '")
+ if count != n:
+ raise AssertionError(
+ f"string has {count} lines, expected {n}")
+
+ def assert_n_known_lines(self, filename, n):
+ with open(filename) as f:
+ string = f.read()
+ count = string.count(" logging for '")
+ if count != n:
+ raise AssertionError(
+ f"{filename} has {count} lines, expected {n}")
+
+ def assert_unlink_expected_strings(self, expected_strings):
+ for k, v in expected_strings.items():
+ if not os.path.exists(k):
+ self.fail(f"{k} does not exist")
+ self.assert_file_contains(k, v)
+ self.assert_n_known_lines(k, len(v))
+ os.unlink(k)
+
+ def test_each_to_its_own(self):
+ level = 4
+ mapping = [(x, level, x) for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_to_one(self):
+ level = 4
+ dest = 'everything'
+ mapping = [(x, level, dest) for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate(self):
+ level = 4
+ dests = ['even', 'odd']
+ mapping = [(x, level + 1, dests[i & 1])
+ for i, x in enumerate(CLASS_LIST)]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate_level_out_of_range(self):
+ # nothing will be logged, because we're logging at a too high
+ # level.
+ level = 4
+ dests = ['even', 'odd']
+ mapping = [(x, level - 1, dests[i & 1])
+ for i, x in enumerate(CLASS_LIST)]
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_bifurcate_misc_log_level(self):
+ # We are sending even numbers to default and odd numbers to
+ # 'odd', at various levels, depending on mod 3. Like this:
+ #
+ # log level = 2 all:5 \
+ # tdb:4@odd \
+ # printdrivers:3 \
+ # lanman:5@odd \
+ # smb:4 \
+ # rpc_parse:3@odd \
+ # rpc_srv:5 ...
+ #
+ # Therefore, 'default' should get classes that are (0 or 4) % 6
+ # and 'odd' should get classes that are (1 or 3) % 6.
+
+ level = 4
+ dests = [None, 'odd']
+ mapping = []
+ for i, x in enumerate(CLASS_LIST):
+ parity = i & 1
+ log_level = level + 1 - (i % 3)
+ mapping.append((x, log_level, dests[parity]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_different_ways_cmdline_d(self):
+ level = 4
+ dests = [None, 'a', 'b', 'c']
+ mapping = []
+ seed = 123
+ for i, x in enumerate(CLASS_LIST):
+ d = seed & 3
+ seed = seed * 17 + 1
+ log_level = seed % 10
+ seed &= 0xff
+ mapping.append((x, log_level, dests[d]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ debug_arg = self._extract_log_level_line(26)
+
+ stdout, stderr = self._run_s4_logger(level, '-d', debug_arg)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_all_different_ways_cmdline_d_interactive(self):
+ level = 4
+ dests = [None, 'a', 'b', 'c']
+ mapping = []
+ seed = 1234
+ for i, x in enumerate(CLASS_LIST):
+ d = seed & 3
+ seed = seed * 13 + 1
+ log_level = seed % 10
+ seed &= 0xff
+ mapping.append((x, log_level, dests[d]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ debug_arg = self._extract_log_level_line(None)
+ stdout, stderr = self._run_s4_logger(level, '-d', debug_arg, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)
+ self.assert_n_known_lines_string(stderr, len(expected_lines))
+
+ def test_only_some_level_0(self):
+ # running the logger with -L 0 makes the log messages run at
+ # level 0 (i.e DBG_ERR), so we always see them in default,
+ # even though smb.conf doesn't ask.
+ mapping = [(x, 3, ['default', 'bees']['b' in x])
+ for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, 0)
+ self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
+ stdout, stderr = self._run_s4_logger(0)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_only_some_level_3(self):
+ # here, we're expecting the unmentioned non-b classes to just
+ # disappear.
+ level = 3
+ mapping = [(x, level, 'bees') for x in CLASS_LIST if 'b' in x]
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none(self):
+ level = 4
+ mapping = []
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none_high_default(self):
+ # We set the default level to 5 and do nothing else special,
+ # which means we need a different mapping for the smb.conf
+ # than the expected strings.
+ level = 4
+ mapping = [(x, 5, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ # note the empty mapping in smb.conf
+ self._write_smb_conf(mapping=[], default_level=5)
+ stdout, stderr = self._run_s4_logger(level)
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_none_high_cmdline_d(self):
+ # We set the default level to 2, but run the 'server' with -d 10.
+ level = 4
+ mapping = [(x, 10, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ # note the empty mapping in smb.conf
+ self._write_smb_conf(mapping=[])
+ stdout, stderr = self._run_s4_logger(level, '-d', '10')
+ self.assert_unlink_expected_strings(expected_strings)
+
+ def test_interactive_high_default_simple(self):
+ # running with -i should send everything to stderr.
+ level = 4
+ mapping = [(x, 5, 'default') for x in CLASS_LIST]
+ expected_strings = self._get_expected_strings(mapping, level)
+ self._write_smb_conf(mapping=[], default_level=5)
+ stdout, stderr = self._run_s4_logger(level, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)
+
+ def test_interactive_complex_smb_conf(self):
+ # running with -i should send everything to stderr. The
+ # smb.conf will set the levels, but the target files are
+ # overridden.
+ # (this is the test_bifurcate_misc_log_level() smb.conf).
+ level = 4
+ dests = [None, 'odd']
+ mapping = []
+ for i, x in enumerate(CLASS_LIST):
+ parity = i & 1
+ log_level = level + 1 - (i % 3)
+ mapping.append((x, log_level, dests[parity]))
+
+ expected_strings = self._get_expected_strings(mapping, level)
+
+ self._write_smb_conf(mapping=mapping)
+ stdout, stderr = self._run_s4_logger(level, '-i')
+ expected_lines = []
+ for v in expected_strings.values():
+ # stderr doesn't end up with leading ' '
+ expected_lines.extend([x.strip() for x in v])
+
+ self.assert_string_contains(stderr, expected_lines)
diff --git a/selftest/tests.py b/selftest/tests.py
index 48d23692e82..82d210d7a46 100644
--- a/selftest/tests.py
+++ b/selftest/tests.py
@@ -92,6 +92,7 @@ planpythontestsuite("none", "samba.tests.s3idmapdb")
planpythontestsuite("none", "samba.tests.samba3sam")
planpythontestsuite("none", "samba.tests.dsdb_api")
planpythontestsuite("none", "samba.tests.smbconf")
+planpythontestsuite("none", "samba.tests.logfiles")
planpythontestsuite(
"none", "wafsamba.tests.test_suite",
extra_path=[os.path.join(samba4srcdir, "..", "buildtools"),