summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Bartlett <abartlet@samba.org>2017-03-14 16:43:06 +1300
committerAndrew Bartlett <abartlet@samba.org>2017-03-29 02:37:25 +0200
commit3ee82de26df77f97abe1ca70c69f2b7c47421207 (patch)
treeddbd97d6d15e0e6de1ea3be115ccc655a439e925
parent41f1da3a1ae0335ad485118c14394b98b9890abe (diff)
downloadsamba-3ee82de26df77f97abe1ca70c69f2b7c47421207.tar.gz
auth_log: Add tests by listening for JSON messages over the message bus
Signed-off-by: Andrew Bartlett <abartlet@samba.org> Signed-off-by: Gary Lockyer <gary@catalyst.net.nz> Pair-programmed-by: Gary Lockyer <gary@catalyst.net.nz>
-rw-r--r--python/samba/tests/auth_log.py801
-rw-r--r--python/samba/tests/auth_log_base.py102
-rw-r--r--python/samba/tests/auth_log_ncalrpc.py104
-rw-r--r--selftest/knownfail2
-rwxr-xr-xsource4/selftest/tests.py10
5 files changed, 1019 insertions, 0 deletions
diff --git a/python/samba/tests/auth_log.py b/python/samba/tests/auth_log.py
new file mode 100644
index 00000000000..abd2952fbb0
--- /dev/null
+++ b/python/samba/tests/auth_log.py
@@ -0,0 +1,801 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import srvsvc, dnsserver
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+import samba.tests.auth_log_base
+from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+from samba import NTSTATUSError
+from subprocess import call
+
+class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
+
+ def setUp(self):
+ super(AuthLogTests, self).setUp()
+ self.remoteAddress = os.environ["CLIENT_IP"]
+
+ def tearDown(self):
+ super(AuthLogTests, self).tearDown()
+
+
+
+ def _test_rpc_ncacn_np(self, authTypes, creds, service,
+ binding, protection, checkFunction):
+ def isLastExpectedMessage( msg):
+ return (
+ msg["type"] == "Authorization" and
+ ( msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
+ msg["Authorization"]["serviceDescription"] == service) and
+ msg["Authorization"]["authType"] == authTypes[0] and
+ msg["Authorization"]["transportProtection"] == protection
+ )
+
+ if binding:
+ binding = "[%s]" % binding
+
+ if service == "dnsserver":
+ x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
+ self.get_loadparm(),
+ creds)
+ elif service == "srvsvc":
+ x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
+ self.get_loadparm(),
+ creds)
+
+ # The connection is passed to ensure the server
+ # messaging context stays up until all the messages have been received.
+ messages = self.waitForMessages(isLastExpectedMessage, x)
+ checkFunction(messages, authTypes, service, binding, protection)
+
+ def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
+ binding, protection):
+
+ expected_messages = len(authTypes)
+ self.assertEquals(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("SMB",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+
+ # Check the second message it should be an Authorization
+ msg = messages[1]
+ self.assertEquals("Authorization", msg["type"])
+ self.assertEquals("SMB",
+ msg["Authorization"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
+ self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
+
+ # Check the third message it should be an Authentication
+ # if we are expecting 4 messages
+ if expected_messages == 4:
+ def checkServiceDescription( desc):
+ return (desc == "DCE/RPC" or desc == service)
+
+ msg = messages[2]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertTrue(
+ checkServiceDescription( msg["Authentication"]["serviceDescription"]))
+
+ self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
+
+ def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
+
+ expected_messages = len(authTypes)
+ self.assertEquals(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authentication
+ # This is almost certainly Authentication over UDP, and is probably
+ # returning message too big,
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
+
+ # Check the second message it should be an Authentication
+ # This this the TCP Authentication in response to the message too big
+ # response to the UDP Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+ # Check the third message it should be an Authorization
+ msg = messages[2]
+ self.assertEquals("Authorization", msg["type"])
+ serviceDescription = "SMB"
+ print "binding %s" % binding
+ if binding == "[smb2]":
+ serviceDescription = "SMB2"
+
+ self.assertEquals(serviceDescription,
+ msg["Authorization"]["serviceDescription"])
+ self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
+ self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
+
+
+ def test_rpc_ncacn_np_ntlm_dns_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["NTLMSSP",
+ "NTLMSSP",
+ "NTLMSSP",
+ "NTLMSSP"],
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_np_ntlm_check)
+
+ def test_rpc_ncacn_np_ntlm_srv_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["NTLMSSP",
+ "NTLMSSP",
+ "NTLMSSP",
+ "NTLMSSP"],
+ creds, "srvsvc", "sign", "SIGN",
+ self.rpc_ncacn_np_ntlm_check)
+
+ def test_rpc_ncacn_np_ntlm_dns(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["ncacn_np",
+ "NTLMSSP",
+ "NTLMSSP"],
+ creds, "dnsserver", "", "SMB",
+ self.rpc_ncacn_np_ntlm_check)
+
+ def test_rpc_ncacn_np_ntlm_srv(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["ncacn_np",
+ "NTLMSSP",
+ "NTLMSSP"],
+ creds, "srvsvc", "", "SMB",
+ self.rpc_ncacn_np_ntlm_check)
+
+ def test_rpc_ncacn_np_krb_dns_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["krb5",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_np_krb5_check)
+
+ def test_rpc_ncacn_np_krb_srv_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["krb5",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
+ creds, "srvsvc", "sign", "SIGN",
+ self.rpc_ncacn_np_krb5_check)
+
+ def test_rpc_ncacn_np_krb_dns(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["ncacn_np",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
+ creds, "dnsserver", "", "SMB",
+ self.rpc_ncacn_np_krb5_check)
+
+ def test_rpc_ncacn_np_krb_dns_smb2(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["ncacn_np",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
+ creds, "dnsserver", "smb2", "SMB",
+ self.rpc_ncacn_np_krb5_check)
+
+ def test_rpc_ncacn_np_krb_srv(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_np(["ncacn_np",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication",
+ "krb5"],
+ creds, "srvsvc", "", "SMB",
+ self.rpc_ncacn_np_krb5_check)
+
+ def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
+ binding, protection, checkFunction):
+ def isLastExpectedMessage( msg):
+ return (
+ msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
+ msg["Authorization"]["authType"] == authTypes[0] and
+ msg["Authorization"]["transportProtection"] == protection
+ )
+
+ if binding:
+ binding = "[%s]" % binding
+
+ if service == "dnsserver":
+ dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
+ self.get_loadparm(),
+ creds)
+ elif service == "srvsvc":
+ srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
+ self.get_loadparm(),
+ creds)
+
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ checkFunction(messages, authTypes, service, binding, protection)
+
+ def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
+ binding, protection):
+
+ expected_messages = len(authTypes)
+ self.assertEquals(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authorization
+ msg = messages[0]
+ self.assertEquals("Authorization", msg["type"])
+ self.assertEquals("DCE/RPC",
+ msg["Authorization"]["serviceDescription"])
+ self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+ self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+ # Check the second message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("DCE/RPC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+ def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
+ binding, protection):
+
+ expected_messages = len(authTypes)
+ self.assertEquals(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authorization
+ msg = messages[0]
+ self.assertEquals("Authorization", msg["type"])
+ self.assertEquals("DCE/RPC",
+ msg["Authorization"]["serviceDescription"])
+ self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+ self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+ # Check the second message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+ # Check the third message it should be an Authentication
+ msg = messages[2]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+ def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+ "ncacn_ip_tcp",
+ "NTLMSSP"],
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
+
+ def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["krb5",
+ "ncacn_ip_tcp",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication"],
+ creds, "dnsserver", "sign", "SIGN",
+ self.rpc_ncacn_ip_tcp_krb5_check)
+
+ def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+ "ncacn_ip_tcp",
+ "NTLMSSP"],
+ creds, "dnsserver", "", "SIGN",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
+
+ def test_rpc_ncacn_ip_tcp_krb5_dns(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["krb5",
+ "ncacn_ip_tcp",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication"],
+ creds, "dnsserver", "", "SIGN",
+ self.rpc_ncacn_ip_tcp_krb5_check)
+
+ def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+ "ncacn_ip_tcp",
+ "NTLMSSP"],
+ creds, "dnsserver", "connect", "NONE",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
+
+ def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["krb5",
+ "ncacn_ip_tcp",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication"],
+ creds, "dnsserver", "connect", "NONE",
+ self.rpc_ncacn_ip_tcp_krb5_check)
+
+ def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
+ "ncacn_ip_tcp",
+ "NTLMSSP"],
+ creds, "dnsserver", "seal", "SEAL",
+ self.rpc_ncacn_ip_tcp_ntlm_check)
+
+ def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=MUST_USE_KERBEROS)
+ self._test_rpc_ncacn_ip_tcp(["krb5",
+ "ncacn_ip_tcp",
+ "ENC-TS Pre-authentication",
+ "ENC-TS Pre-authentication"],
+ creds, "dnsserver", "seal", "SEAL",
+ self.rpc_ncacn_ip_tcp_krb5_check)
+
+ def test_ldap(self):
+
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["transportProtection"] == "SIGN" and
+ msg["Authorization"]["authType"] == "krb5")
+
+ self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
+ lp = self.get_loadparm(),
+ credentials=self.get_credentials())
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(3,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("ENC-TS Pre-authentication",
+ msg["Authentication"]["authDescription"])
+
+ # Check the first message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("ENC-TS Pre-authentication",
+ msg["Authentication"]["authDescription"])
+
+ def test_ldap_ntlm(self):
+
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["transportProtection"] == "SEAL" and
+ msg["Authorization"]["authType"] == "NTLMSSP")
+
+ self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
+ lp = self.get_loadparm(),
+ credentials=self.get_credentials())
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(2,
+ len(messages),
+ "Did not receive the expected number of messages")
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("LDAP",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
+
+ def test_ldap_simple_bind(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "LDAP" and
+ msg["Authorization"]["transportProtection"] == "TLS" and
+ msg["Authorization"]["authType"] == "simple bind")
+
+ creds = self.insta_creds(template=self.get_credentials())
+ creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
+ creds.get_username()))
+
+ self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
+ lp = self.get_loadparm(),
+ credentials=creds)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(2,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("LDAP",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("simple bind",
+ msg["Authentication"]["authDescription"])
+
+ def test_smb(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "SMB" and
+ msg["Authorization"]["authType"] == "krb5" and
+ msg["Authorization"]["transportProtection"] == "SMB")
+
+ creds = self.insta_creds(template=self.get_credentials())
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(3,
+ len(messages),
+ "Did not receive the expected number of messages")
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("ENC-TS Pre-authentication",
+ msg["Authentication"]["authDescription"])
+
+ # Check the second message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("Kerberos KDC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("ENC-TS Pre-authentication",
+ msg["Authentication"]["authDescription"])
+
+ def test_smb_bad_password(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"]
+ == "Kerberos KDC" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_WRONG_PASSWORD" and
+ msg["Authentication"]["authDescription"]
+ == "ENC-TS Pre-authentication")
+
+ creds = self.insta_creds(template=self.get_credentials())
+ creds.set_password("badPassword")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+
+ def test_smb_bad_user(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"]
+ == "Kerberos KDC" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_NO_SUCH_USER" and
+ msg["Authentication"]["authDescription"]
+ == "ENC-TS Pre-authentication")
+
+ creds = self.insta_creds(template=self.get_credentials())
+ creds.set_username("badUser")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ def test_smb_anonymous(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "SMB" and
+ msg["Authorization"]["authType"] == "NTLMSSP" and
+ msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
+ msg["Authorization"]["transportProtection"] == "SMB")
+
+ server = os.environ["SERVER"]
+
+ path = "//%s/IPC$" % server
+ auth = "-N"
+ call(["bin/smbclient", path, auth, "-c quit"])
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(3,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_NO_SUCH_USER",
+ msg["Authentication"]["status"])
+ self.assertEquals("SMB",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("NTLMSSP",
+ msg["Authentication"]["authDescription"])
+ self.assertEquals("No-Password",
+ msg["Authentication"]["passwordType"])
+
+ # Check the second message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK",
+ msg["Authentication"]["status"])
+ self.assertEquals("SMB",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("NTLMSSP",
+ msg["Authentication"]["authDescription"])
+ self.assertEquals("No-Password",
+ msg["Authentication"]["passwordType"])
+ self.assertEquals("ANONYMOUS LOGON",
+ msg["Authentication"]["becameAccount"])
+
+ def test_smb_no_krb_spnego(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "SMB" and
+ msg["Authorization"]["authType"] == "NTLMSSP" and
+ msg["Authorization"]["transportProtection"] == "SMB")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(2,
+ len(messages),
+ "Did not receive the expected number of messages")
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("SMB",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("NTLMSSP",
+ msg["Authentication"]["authDescription"])
+ self.assertEquals("NTLMv2",
+ msg["Authentication"]["passwordType"])
+
+ def test_smb_no_krb_spnego_bad_password(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"] == "SMB" and
+ msg["Authentication"]["authDescription"] == "NTLMSSP" and
+ msg["Authentication"]["passwordType"] == "NTLMv2" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_WRONG_PASSWORD")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ creds.set_password("badPassword")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ def test_smb_no_krb_spnego_bad_user(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"] == "SMB" and
+ msg["Authentication"]["authDescription"] == "NTLMSSP" and
+ msg["Authentication"]["passwordType"] == "NTLMv2" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_NO_SUCH_USER")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ creds.set_username("badUser")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds)
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ def test_smb_no_krb_no_spnego_no_ntlmv2(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "SMB" and
+ msg["Authorization"]["authType"] == "bare-NTLM" and
+ msg["Authorization"]["transportProtection"] == "SMB")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds,
+ ntlmv2_auth = False,
+ use_spnego = False )
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(2,
+ len(messages),
+ "Did not receive the expected number of messages")
+ # Check the first message it should be an Authentication
+ msg = messages[0]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("SMB",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals("bare-NTLM",
+ msg["Authentication"]["authDescription"])
+ self.assertEquals("NTLMv1",
+ msg["Authentication"]["passwordType"])
+
+ def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"] == "SMB" and
+ msg["Authentication"]["authDescription"] == "bare-NTLM" and
+ msg["Authentication"]["passwordType"] == "NTLMv1" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_WRONG_PASSWORD")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ creds.set_password("badPassword")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds,
+ ntlmv2_auth = False,
+ use_spnego = False )
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
+ def isLastExpectedMessage( msg):
+ return (msg["type"] == "Authentication" and
+ msg["Authentication"]["serviceDescription"] == "SMB" and
+ msg["Authentication"]["authDescription"] == "bare-NTLM" and
+ msg["Authentication"]["passwordType"] == "NTLMv1" and
+ msg["Authentication"]["status"]
+ == "NT_STATUS_NO_SUCH_USER")
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ creds.set_username("badUser")
+
+ thrown = False
+ try:
+ smb.SMB(self.server,
+ "sysvol",
+ lp=self.get_loadparm(),
+ creds=creds,
+ ntlmv2_auth = False,
+ use_spnego = False )
+ except NTSTATUSError:
+ thrown = True
+ self.assertEquals( thrown, True)
+
+
+ messages = self.waitForMessages( isLastExpectedMessage)
+ self.assertEquals(1,
+ len(messages),
+ "Did not receive the expected number of messages")
diff --git a/python/samba/tests/auth_log_base.py b/python/samba/tests/auth_log_base.py
new file mode 100644
index 00000000000..2b3ccfdbb4b
--- /dev/null
+++ b/python/samba/tests/auth_log_base.py
@@ -0,0 +1,102 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import srvsvc, dnsserver
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+
+class AuthLogTestBase(samba.tests.TestCase):
+
+ def setUp(self):
+ super(AuthLogTestBase, self).setUp()
+ lp_ctx = self.get_loadparm()
+ self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx);
+ self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
+
+ def messageHandler( context, msgType, src, message):
+ # This does not look like sub unit output and it
+ # makes these tests much easier to debug.
+ print message
+ jsonMsg = json.loads(message)
+ context["messages"].append( jsonMsg)
+
+ self.context = { "messages": []}
+ self.msg_handler_and_context = (messageHandler, self.context)
+ self.msg_ctx.register(self.msg_handler_and_context,
+ msg_type=MSG_AUTH_LOG)
+
+ # Discard any previously queued messages.
+ self.msg_ctx.loop_once(0.001)
+ while len( self.context["messages"]):
+ self.msg_ctx.loop_once(0.001)
+ self.context["messages"] = []
+
+
+ self.remoteAddress = None
+ self.server = os.environ["SERVER"]
+ self.connection = None
+
+ def tearDown(self):
+ if self.msg_handler_and_context:
+ self.msg_ctx.deregister(self.msg_handler_and_context,
+ msg_type=MSG_AUTH_LOG)
+
+
+ def waitForMessages(self, isLastExpectedMessage, connection=None):
+
+ def completed( messages):
+ for message in messages:
+ if isRemote( message) and isLastExpectedMessage( message):
+ return True
+ return False
+
+ def isRemote( message):
+ remote = None
+ if message["type"] == "Authorization":
+ remote = message["Authorization"]["remoteAddress"]
+ elif message["type"] == "Authentication":
+ remote = message["Authentication"]["remoteAddress"]
+ else:
+ return False
+
+ try:
+ addr = remote.split(":")
+ return addr[1] == self.remoteAddress
+ except IndexError:
+ return False
+
+ self.connection = connection
+
+ start_time = time.time()
+ while not completed( self.context["messages"]):
+ self.msg_ctx.loop_once(0.1)
+ if time.time() - start_time > 1:
+ self.connection = None
+ return []
+
+ self.connection = None
+ return filter( isRemote, self.context["messages"])
diff --git a/python/samba/tests/auth_log_ncalrpc.py b/python/samba/tests/auth_log_ncalrpc.py
new file mode 100644
index 00000000000..2538c61c56e
--- /dev/null
+++ b/python/samba/tests/auth_log_ncalrpc.py
@@ -0,0 +1,104 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""Tests for the Auth and AuthZ logging.
+"""
+
+from samba import auth
+import samba.tests
+from samba.messaging import Messaging
+from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
+from samba.dcerpc import samr
+import time
+import json
+import os
+from samba import smb
+from samba.samdb import SamDB
+import samba.tests.auth_log_base
+from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
+
+class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase):
+
+ def setUp(self):
+ super(AuthLogTestsNcalrpc, self).setUp()
+ self.remoteAddress = "/root/ncalrpc_as_system"
+
+ def tearDown(self):
+ super(AuthLogTestsNcalrpc , self).tearDown()
+
+
+ def _test_rpc_ncaclrpc(self, authTypes, binding, creds,
+ protection, checkFunction):
+
+ def isLastExpectedMessage( msg):
+ return (
+ msg["type"] == "Authorization" and
+ msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
+ msg["Authorization"]["authType"] == authTypes[0] and
+ msg["Authorization"]["transportProtection"] == protection
+ )
+
+ if binding:
+ binding = "[%s]" % binding
+
+ samr.samr("ncalrpc:%s" % binding, self.get_loadparm(), creds)
+ messages = self.waitForMessages( isLastExpectedMessage)
+ checkFunction(messages, authTypes, protection)
+
+ def rpc_ncacn_np_ntlm_check(self, messages, authTypes, protection):
+
+ expected_messages = len(authTypes)
+ self.assertEquals(expected_messages,
+ len(messages),
+ "Did not receive the expected number of messages")
+
+ # Check the first message it should be an Authorization
+ msg = messages[0]
+ self.assertEquals("Authorization", msg["type"])
+ self.assertEquals("DCE/RPC",
+ msg["Authorization"]["serviceDescription"])
+ self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
+ self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
+
+ # Check the second message it should be an Authentication
+ msg = messages[1]
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
+ self.assertEquals("DCE/RPC",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
+
+
+ def test_ncalrpc_ntlm_dns_sign(self):
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncaclrpc(["NTLMSSP",
+ "ncalrpc",
+ "NTLMSSP"],
+ "", creds, "SIGN",
+ self.rpc_ncacn_np_ntlm_check)
+
+ def test_ncalrpc_ntlm_dns_seal(self):
+
+ creds = self.insta_creds(template=self.get_credentials(),
+ kerberos_state=DONT_USE_KERBEROS)
+ self._test_rpc_ncaclrpc(["NTLMSSP",
+ "ncalrpc",
+ "NTLMSSP"],
+ "seal", creds, "SEAL",
+ self.rpc_ncacn_np_ntlm_check)
diff --git a/selftest/knownfail b/selftest/knownfail
index b25038064c3..d7a10ef8396 100644
--- a/selftest/knownfail
+++ b/selftest/knownfail
@@ -319,3 +319,5 @@
^samba3.smb2.credits.skipped_mid.*
^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_dbcheck
^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_check_missing
+^samba.tests.auth_log.samba.tests.auth_log.AuthLogTests
+^samba.tests.auth_log_ncalrpc.samba.tests.auth_log_ncalrpc.AuthLogTestsNcalrpc
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 890d41ed713..bef08c8b7b5 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -68,6 +68,7 @@ finally:
have_tls_support = ("ENABLE_GNUTLS" in config_hash)
have_heimdal_support = ("SAMBA4_USES_HEIMDAL" in config_hash)
+have_jansson_support = ("HAVE_JANSSON" in config_hash)
if have_tls_support:
for options in ['-U"$USERNAME%$PASSWORD"']:
@@ -588,6 +589,15 @@ planpythontestsuite("ad_dc:local", "samba.tests.samba_tool.dnscmd")
planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.rpcecho")
planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.registry", extra_args=['-U"$USERNAME%$PASSWORD"'])
planoldpythontestsuite("ad_dc_ntvfs", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
+if have_jansson_support:
+ planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'],
+ environ={'CLIENT_IP': '127.0.0.11',
+ 'SOCKET_WRAPPER_DEFAULT_IFACE': 11})
+ planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'],
+ environ={'CLIENT_IP': '127.0.0.11',
+ 'SOCKET_WRAPPER_DEFAULT_IFACE': 11})
+ planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"'])
+ planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"'])
planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"'])
planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.raw_protocol", extra_args=['-U"$USERNAME%$PASSWORD"'])
plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])