diff options
author | Andrew Bartlett <abartlet@samba.org> | 2017-03-14 16:43:06 +1300 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2017-03-29 02:37:25 +0200 |
commit | 3ee82de26df77f97abe1ca70c69f2b7c47421207 (patch) | |
tree | ddbd97d6d15e0e6de1ea3be115ccc655a439e925 | |
parent | 41f1da3a1ae0335ad485118c14394b98b9890abe (diff) | |
download | samba-3ee82de26df77f97abe1ca70c69f2b7c47421207.tar.gz |
auth_log: Add tests by listening for JSON messages over the message bus
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Pair-programmed-by: Gary Lockyer <gary@catalyst.net.nz>
-rw-r--r-- | python/samba/tests/auth_log.py | 801 | ||||
-rw-r--r-- | python/samba/tests/auth_log_base.py | 102 | ||||
-rw-r--r-- | python/samba/tests/auth_log_ncalrpc.py | 104 | ||||
-rw-r--r-- | selftest/knownfail | 2 | ||||
-rwxr-xr-x | source4/selftest/tests.py | 10 |
5 files changed, 1019 insertions, 0 deletions
diff --git a/python/samba/tests/auth_log.py b/python/samba/tests/auth_log.py new file mode 100644 index 00000000000..abd2952fbb0 --- /dev/null +++ b/python/samba/tests/auth_log.py @@ -0,0 +1,801 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Auth and AuthZ logging. +""" + +from samba import auth +import samba.tests +from samba.messaging import Messaging +from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME +from samba.dcerpc import srvsvc, dnsserver +import time +import json +import os +from samba import smb +from samba.samdb import SamDB +import samba.tests.auth_log_base +from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS +from samba import NTSTATUSError +from subprocess import call + +class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase): + + def setUp(self): + super(AuthLogTests, self).setUp() + self.remoteAddress = os.environ["CLIENT_IP"] + + def tearDown(self): + super(AuthLogTests, self).tearDown() + + + + def _test_rpc_ncacn_np(self, authTypes, creds, service, + binding, protection, checkFunction): + def isLastExpectedMessage( msg): + return ( + msg["type"] == "Authorization" and + ( msg["Authorization"]["serviceDescription"] == "DCE/RPC" or + msg["Authorization"]["serviceDescription"] == service) and + msg["Authorization"]["authType"] == authTypes[0] and + msg["Authorization"]["transportProtection"] == protection + ) + + if binding: + binding = "[%s]" % binding + + if service == "dnsserver": + x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding), + self.get_loadparm(), + creds) + elif service == "srvsvc": + x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding), + self.get_loadparm(), + creds) + + # The connection is passed to ensure the server + # messaging context stays up until all the messages have been received. + messages = self.waitForMessages(isLastExpectedMessage, x) + checkFunction(messages, authTypes, service, binding, protection) + + def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service, + binding, protection): + + expected_messages = len(authTypes) + self.assertEquals(expected_messages, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("SMB", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"]) + + # Check the second message it should be an Authorization + msg = messages[1] + self.assertEquals("Authorization", msg["type"]) + self.assertEquals("SMB", + msg["Authorization"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authorization"]["authType"]) + self.assertEquals("SMB", msg["Authorization"]["transportProtection"]) + + # Check the third message it should be an Authentication + # if we are expecting 4 messages + if expected_messages == 4: + def checkServiceDescription( desc): + return (desc == "DCE/RPC" or desc == service) + + msg = messages[2] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertTrue( + checkServiceDescription( msg["Authentication"]["serviceDescription"])) + + self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"]) + + def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection): + + expected_messages = len(authTypes) + self.assertEquals(expected_messages, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authentication + # This is almost certainly Authentication over UDP, and is probably + # returning message too big, + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"]) + + # Check the second message it should be an Authentication + # This this the TCP Authentication in response to the message too big + # response to the UDP Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + + # Check the third message it should be an Authorization + msg = messages[2] + self.assertEquals("Authorization", msg["type"]) + serviceDescription = "SMB" + print "binding %s" % binding + if binding == "[smb2]": + serviceDescription = "SMB2" + + self.assertEquals(serviceDescription, + msg["Authorization"]["serviceDescription"]) + self.assertEquals(authTypes[3], msg["Authorization"]["authType"]) + self.assertEquals("SMB", msg["Authorization"]["transportProtection"]) + + + def test_rpc_ncacn_np_ntlm_dns_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_np(["NTLMSSP", + "NTLMSSP", + "NTLMSSP", + "NTLMSSP"], + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_np_ntlm_check) + + def test_rpc_ncacn_np_ntlm_srv_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_np(["NTLMSSP", + "NTLMSSP", + "NTLMSSP", + "NTLMSSP"], + creds, "srvsvc", "sign", "SIGN", + self.rpc_ncacn_np_ntlm_check) + + def test_rpc_ncacn_np_ntlm_dns(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_np(["ncacn_np", + "NTLMSSP", + "NTLMSSP"], + creds, "dnsserver", "", "SMB", + self.rpc_ncacn_np_ntlm_check) + + def test_rpc_ncacn_np_ntlm_srv(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_np(["ncacn_np", + "NTLMSSP", + "NTLMSSP"], + creds, "srvsvc", "", "SMB", + self.rpc_ncacn_np_ntlm_check) + + def test_rpc_ncacn_np_krb_dns_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_np(["krb5", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_np_krb5_check) + + def test_rpc_ncacn_np_krb_srv_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_np(["krb5", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], + creds, "srvsvc", "sign", "SIGN", + self.rpc_ncacn_np_krb5_check) + + def test_rpc_ncacn_np_krb_dns(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_np(["ncacn_np", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], + creds, "dnsserver", "", "SMB", + self.rpc_ncacn_np_krb5_check) + + def test_rpc_ncacn_np_krb_dns_smb2(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_np(["ncacn_np", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], + creds, "dnsserver", "smb2", "SMB", + self.rpc_ncacn_np_krb5_check) + + def test_rpc_ncacn_np_krb_srv(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_np(["ncacn_np", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication", + "krb5"], + creds, "srvsvc", "", "SMB", + self.rpc_ncacn_np_krb5_check) + + def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service, + binding, protection, checkFunction): + def isLastExpectedMessage( msg): + return ( + msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "DCE/RPC" and + msg["Authorization"]["authType"] == authTypes[0] and + msg["Authorization"]["transportProtection"] == protection + ) + + if binding: + binding = "[%s]" % binding + + if service == "dnsserver": + dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding), + self.get_loadparm(), + creds) + elif service == "srvsvc": + srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding), + self.get_loadparm(), + creds) + + + messages = self.waitForMessages( isLastExpectedMessage) + checkFunction(messages, authTypes, service, binding, protection) + + def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service, + binding, protection): + + expected_messages = len(authTypes) + self.assertEquals(expected_messages, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authorization + msg = messages[0] + self.assertEquals("Authorization", msg["type"]) + self.assertEquals("DCE/RPC", + msg["Authorization"]["serviceDescription"]) + self.assertEquals(authTypes[1], msg["Authorization"]["authType"]) + self.assertEquals("NONE", msg["Authorization"]["transportProtection"]) + + # Check the second message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("DCE/RPC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + + def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service, + binding, protection): + + expected_messages = len(authTypes) + self.assertEquals(expected_messages, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authorization + msg = messages[0] + self.assertEquals("Authorization", msg["type"]) + self.assertEquals("DCE/RPC", + msg["Authorization"]["serviceDescription"]) + self.assertEquals(authTypes[1], msg["Authorization"]["authType"]) + self.assertEquals("NONE", msg["Authorization"]["transportProtection"]) + + # Check the second message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + + # Check the third message it should be an Authentication + msg = messages[2] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + + def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["NTLMSSP", + "ncacn_ip_tcp", + "NTLMSSP"], + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_ip_tcp_ntlm_check) + + def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["krb5", + "ncacn_ip_tcp", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication"], + creds, "dnsserver", "sign", "SIGN", + self.rpc_ncacn_ip_tcp_krb5_check) + + def test_rpc_ncacn_ip_tcp_ntlm_dns(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["NTLMSSP", + "ncacn_ip_tcp", + "NTLMSSP"], + creds, "dnsserver", "", "SIGN", + self.rpc_ncacn_ip_tcp_ntlm_check) + + def test_rpc_ncacn_ip_tcp_krb5_dns(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["krb5", + "ncacn_ip_tcp", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication"], + creds, "dnsserver", "", "SIGN", + self.rpc_ncacn_ip_tcp_krb5_check) + + def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["NTLMSSP", + "ncacn_ip_tcp", + "NTLMSSP"], + creds, "dnsserver", "connect", "NONE", + self.rpc_ncacn_ip_tcp_ntlm_check) + + def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["krb5", + "ncacn_ip_tcp", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication"], + creds, "dnsserver", "connect", "NONE", + self.rpc_ncacn_ip_tcp_krb5_check) + + def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["NTLMSSP", + "ncacn_ip_tcp", + "NTLMSSP"], + creds, "dnsserver", "seal", "SEAL", + self.rpc_ncacn_ip_tcp_ntlm_check) + + def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self): + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=MUST_USE_KERBEROS) + self._test_rpc_ncacn_ip_tcp(["krb5", + "ncacn_ip_tcp", + "ENC-TS Pre-authentication", + "ENC-TS Pre-authentication"], + creds, "dnsserver", "seal", "SEAL", + self.rpc_ncacn_ip_tcp_krb5_check) + + def test_ldap(self): + + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "LDAP" and + msg["Authorization"]["transportProtection"] == "SIGN" and + msg["Authorization"]["authType"] == "krb5") + + self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"], + lp = self.get_loadparm(), + credentials=self.get_credentials()) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(3, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("ENC-TS Pre-authentication", + msg["Authentication"]["authDescription"]) + + # Check the first message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("ENC-TS Pre-authentication", + msg["Authentication"]["authDescription"]) + + def test_ldap_ntlm(self): + + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "LDAP" and + msg["Authorization"]["transportProtection"] == "SEAL" and + msg["Authorization"]["authType"] == "NTLMSSP") + + self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"], + lp = self.get_loadparm(), + credentials=self.get_credentials()) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(2, + len(messages), + "Did not receive the expected number of messages") + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("LDAP", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"]) + + def test_ldap_simple_bind(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "LDAP" and + msg["Authorization"]["transportProtection"] == "TLS" and + msg["Authorization"]["authType"] == "simple bind") + + creds = self.insta_creds(template=self.get_credentials()) + creds.set_bind_dn("%s\\%s" % (creds.get_domain(), + creds.get_username())) + + self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"], + lp = self.get_loadparm(), + credentials=creds) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(2, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("LDAP", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("simple bind", + msg["Authentication"]["authDescription"]) + + def test_smb(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "SMB" and + msg["Authorization"]["authType"] == "krb5" and + msg["Authorization"]["transportProtection"] == "SMB") + + creds = self.insta_creds(template=self.get_credentials()) + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(3, + len(messages), + "Did not receive the expected number of messages") + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("ENC-TS Pre-authentication", + msg["Authentication"]["authDescription"]) + + # Check the second message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("Kerberos KDC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("ENC-TS Pre-authentication", + msg["Authentication"]["authDescription"]) + + def test_smb_bad_password(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] + == "Kerberos KDC" and + msg["Authentication"]["status"] + == "NT_STATUS_WRONG_PASSWORD" and + msg["Authentication"]["authDescription"] + == "ENC-TS Pre-authentication") + + creds = self.insta_creds(template=self.get_credentials()) + creds.set_password("badPassword") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") + + + def test_smb_bad_user(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] + == "Kerberos KDC" and + msg["Authentication"]["status"] + == "NT_STATUS_NO_SUCH_USER" and + msg["Authentication"]["authDescription"] + == "ENC-TS Pre-authentication") + + creds = self.insta_creds(template=self.get_credentials()) + creds.set_username("badUser") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") + + def test_smb_anonymous(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "SMB" and + msg["Authorization"]["authType"] == "NTLMSSP" and + msg["Authorization"]["account"] == "ANONYMOUS LOGON" and + msg["Authorization"]["transportProtection"] == "SMB") + + server = os.environ["SERVER"] + + path = "//%s/IPC$" % server + auth = "-N" + call(["bin/smbclient", path, auth, "-c quit"]) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(3, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_NO_SUCH_USER", + msg["Authentication"]["status"]) + self.assertEquals("SMB", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("NTLMSSP", + msg["Authentication"]["authDescription"]) + self.assertEquals("No-Password", + msg["Authentication"]["passwordType"]) + + # Check the second message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", + msg["Authentication"]["status"]) + self.assertEquals("SMB", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("NTLMSSP", + msg["Authentication"]["authDescription"]) + self.assertEquals("No-Password", + msg["Authentication"]["passwordType"]) + self.assertEquals("ANONYMOUS LOGON", + msg["Authentication"]["becameAccount"]) + + def test_smb_no_krb_spnego(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "SMB" and + msg["Authorization"]["authType"] == "NTLMSSP" and + msg["Authorization"]["transportProtection"] == "SMB") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(2, + len(messages), + "Did not receive the expected number of messages") + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("SMB", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("NTLMSSP", + msg["Authentication"]["authDescription"]) + self.assertEquals("NTLMv2", + msg["Authentication"]["passwordType"]) + + def test_smb_no_krb_spnego_bad_password(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] == "SMB" and + msg["Authentication"]["authDescription"] == "NTLMSSP" and + msg["Authentication"]["passwordType"] == "NTLMv2" and + msg["Authentication"]["status"] + == "NT_STATUS_WRONG_PASSWORD") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + creds.set_password("badPassword") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") + + def test_smb_no_krb_spnego_bad_user(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] == "SMB" and + msg["Authentication"]["authDescription"] == "NTLMSSP" and + msg["Authentication"]["passwordType"] == "NTLMv2" and + msg["Authentication"]["status"] + == "NT_STATUS_NO_SUCH_USER") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + creds.set_username("badUser") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") + + def test_smb_no_krb_no_spnego_no_ntlmv2(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "SMB" and + msg["Authorization"]["authType"] == "bare-NTLM" and + msg["Authorization"]["transportProtection"] == "SMB") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds, + ntlmv2_auth = False, + use_spnego = False ) + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(2, + len(messages), + "Did not receive the expected number of messages") + # Check the first message it should be an Authentication + msg = messages[0] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("SMB", + msg["Authentication"]["serviceDescription"]) + self.assertEquals("bare-NTLM", + msg["Authentication"]["authDescription"]) + self.assertEquals("NTLMv1", + msg["Authentication"]["passwordType"]) + + def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] == "SMB" and + msg["Authentication"]["authDescription"] == "bare-NTLM" and + msg["Authentication"]["passwordType"] == "NTLMv1" and + msg["Authentication"]["status"] + == "NT_STATUS_WRONG_PASSWORD") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + creds.set_password("badPassword") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds, + ntlmv2_auth = False, + use_spnego = False ) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") + + def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self): + def isLastExpectedMessage( msg): + return (msg["type"] == "Authentication" and + msg["Authentication"]["serviceDescription"] == "SMB" and + msg["Authentication"]["authDescription"] == "bare-NTLM" and + msg["Authentication"]["passwordType"] == "NTLMv1" and + msg["Authentication"]["status"] + == "NT_STATUS_NO_SUCH_USER") + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + creds.set_username("badUser") + + thrown = False + try: + smb.SMB(self.server, + "sysvol", + lp=self.get_loadparm(), + creds=creds, + ntlmv2_auth = False, + use_spnego = False ) + except NTSTATUSError: + thrown = True + self.assertEquals( thrown, True) + + + messages = self.waitForMessages( isLastExpectedMessage) + self.assertEquals(1, + len(messages), + "Did not receive the expected number of messages") diff --git a/python/samba/tests/auth_log_base.py b/python/samba/tests/auth_log_base.py new file mode 100644 index 00000000000..2b3ccfdbb4b --- /dev/null +++ b/python/samba/tests/auth_log_base.py @@ -0,0 +1,102 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Auth and AuthZ logging. +""" + +from samba import auth +import samba.tests +from samba.messaging import Messaging +from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME +from samba.dcerpc import srvsvc, dnsserver +import time +import json +import os +from samba import smb +from samba.samdb import SamDB + +class AuthLogTestBase(samba.tests.TestCase): + + def setUp(self): + super(AuthLogTestBase, self).setUp() + lp_ctx = self.get_loadparm() + self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx); + self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME) + + def messageHandler( context, msgType, src, message): + # This does not look like sub unit output and it + # makes these tests much easier to debug. + print message + jsonMsg = json.loads(message) + context["messages"].append( jsonMsg) + + self.context = { "messages": []} + self.msg_handler_and_context = (messageHandler, self.context) + self.msg_ctx.register(self.msg_handler_and_context, + msg_type=MSG_AUTH_LOG) + + # Discard any previously queued messages. + self.msg_ctx.loop_once(0.001) + while len( self.context["messages"]): + self.msg_ctx.loop_once(0.001) + self.context["messages"] = [] + + + self.remoteAddress = None + self.server = os.environ["SERVER"] + self.connection = None + + def tearDown(self): + if self.msg_handler_and_context: + self.msg_ctx.deregister(self.msg_handler_and_context, + msg_type=MSG_AUTH_LOG) + + + def waitForMessages(self, isLastExpectedMessage, connection=None): + + def completed( messages): + for message in messages: + if isRemote( message) and isLastExpectedMessage( message): + return True + return False + + def isRemote( message): + remote = None + if message["type"] == "Authorization": + remote = message["Authorization"]["remoteAddress"] + elif message["type"] == "Authentication": + remote = message["Authentication"]["remoteAddress"] + else: + return False + + try: + addr = remote.split(":") + return addr[1] == self.remoteAddress + except IndexError: + return False + + self.connection = connection + + start_time = time.time() + while not completed( self.context["messages"]): + self.msg_ctx.loop_once(0.1) + if time.time() - start_time > 1: + self.connection = None + return [] + + self.connection = None + return filter( isRemote, self.context["messages"]) diff --git a/python/samba/tests/auth_log_ncalrpc.py b/python/samba/tests/auth_log_ncalrpc.py new file mode 100644 index 00000000000..2538c61c56e --- /dev/null +++ b/python/samba/tests/auth_log_ncalrpc.py @@ -0,0 +1,104 @@ +# Unix SMB/CIFS implementation. +# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +"""Tests for the Auth and AuthZ logging. +""" + +from samba import auth +import samba.tests +from samba.messaging import Messaging +from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME +from samba.dcerpc import samr +import time +import json +import os +from samba import smb +from samba.samdb import SamDB +import samba.tests.auth_log_base +from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS + +class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase): + + def setUp(self): + super(AuthLogTestsNcalrpc, self).setUp() + self.remoteAddress = "/root/ncalrpc_as_system" + + def tearDown(self): + super(AuthLogTestsNcalrpc , self).tearDown() + + + def _test_rpc_ncaclrpc(self, authTypes, binding, creds, + protection, checkFunction): + + def isLastExpectedMessage( msg): + return ( + msg["type"] == "Authorization" and + msg["Authorization"]["serviceDescription"] == "DCE/RPC" and + msg["Authorization"]["authType"] == authTypes[0] and + msg["Authorization"]["transportProtection"] == protection + ) + + if binding: + binding = "[%s]" % binding + + samr.samr("ncalrpc:%s" % binding, self.get_loadparm(), creds) + messages = self.waitForMessages( isLastExpectedMessage) + checkFunction(messages, authTypes, protection) + + def rpc_ncacn_np_ntlm_check(self, messages, authTypes, protection): + + expected_messages = len(authTypes) + self.assertEquals(expected_messages, + len(messages), + "Did not receive the expected number of messages") + + # Check the first message it should be an Authorization + msg = messages[0] + self.assertEquals("Authorization", msg["type"]) + self.assertEquals("DCE/RPC", + msg["Authorization"]["serviceDescription"]) + self.assertEquals(authTypes[1], msg["Authorization"]["authType"]) + self.assertEquals("NONE", msg["Authorization"]["transportProtection"]) + + # Check the second message it should be an Authentication + msg = messages[1] + self.assertEquals("Authentication", msg["type"]) + self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) + self.assertEquals("DCE/RPC", + msg["Authentication"]["serviceDescription"]) + self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"]) + + + def test_ncalrpc_ntlm_dns_sign(self): + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncaclrpc(["NTLMSSP", + "ncalrpc", + "NTLMSSP"], + "", creds, "SIGN", + self.rpc_ncacn_np_ntlm_check) + + def test_ncalrpc_ntlm_dns_seal(self): + + creds = self.insta_creds(template=self.get_credentials(), + kerberos_state=DONT_USE_KERBEROS) + self._test_rpc_ncaclrpc(["NTLMSSP", + "ncalrpc", + "NTLMSSP"], + "seal", creds, "SEAL", + self.rpc_ncacn_np_ntlm_check) diff --git a/selftest/knownfail b/selftest/knownfail index b25038064c3..d7a10ef8396 100644 --- a/selftest/knownfail +++ b/selftest/knownfail @@ -319,3 +319,5 @@ ^samba3.smb2.credits.skipped_mid.* ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_dbcheck ^samba4.blackbox.dbcheck-links.release-4-5-0-pre1.dangling_multi_valued_check_missing +^samba.tests.auth_log.samba.tests.auth_log.AuthLogTests +^samba.tests.auth_log_ncalrpc.samba.tests.auth_log_ncalrpc.AuthLogTestsNcalrpc diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 890d41ed713..bef08c8b7b5 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -68,6 +68,7 @@ finally: have_tls_support = ("ENABLE_GNUTLS" in config_hash) have_heimdal_support = ("SAMBA4_USES_HEIMDAL" in config_hash) +have_jansson_support = ("HAVE_JANSSON" in config_hash) if have_tls_support: for options in ['-U"$USERNAME%$PASSWORD"']: @@ -588,6 +589,15 @@ planpythontestsuite("ad_dc:local", "samba.tests.samba_tool.dnscmd") planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.rpcecho") planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.dcerpc.registry", extra_args=['-U"$USERNAME%$PASSWORD"']) planoldpythontestsuite("ad_dc_ntvfs", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"']) +if have_jansson_support: + planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'], + environ={'CLIENT_IP': '127.0.0.11', + 'SOCKET_WRAPPER_DEFAULT_IFACE': 11}) + planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log", extra_args=['-U"$USERNAME%$PASSWORD"'], + environ={'CLIENT_IP': '127.0.0.11', + 'SOCKET_WRAPPER_DEFAULT_IFACE': 11}) + planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"']) + planoldpythontestsuite("ad_dc:local", "samba.tests.auth_log_ncalrpc", extra_args=['-U"$USERNAME%$PASSWORD"']) planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.dnsserver", extra_args=['-U"$USERNAME%$PASSWORD"']) planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.raw_protocol", extra_args=['-U"$USERNAME%$PASSWORD"']) plantestsuite_loadlist("samba4.ldap.python(ad_dc_ntvfs)", "ad_dc_ntvfs", [python, os.path.join(samba4srcdir, "dsdb/tests/python/ldap.py"), '$SERVER', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT']) |