diff options
author | Joseph Sutton <josephsutton@catalyst.net.nz> | 2022-06-07 17:35:35 +1200 |
---|---|---|
committer | Douglas Bagnall <dbagnall@samba.org> | 2022-07-28 22:47:37 +0000 |
commit | b41691d0e546795bda994d94091b8e0a03ab96d6 (patch) | |
tree | c7329e8e2486aec09f5ebf7a3689f6731e5c7edd /python | |
parent | d277700710dc118f61065ed9e16e08e76820b66a (diff) | |
download | samba-b41691d0e546795bda994d94091b8e0a03ab96d6.tar.gz |
CVE-2022-32743 tests/py_credentials: Add tests for setting dNSHostName with LogonGetDomainInfo()
Test that the value is properly validated, and that it can be set
regardless of rights on the account.
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14833
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Diffstat (limited to 'python')
-rw-r--r-- | python/samba/tests/py_credentials.py | 281 |
1 files changed, 279 insertions, 2 deletions
diff --git a/python/samba/tests/py_credentials.py b/python/samba/tests/py_credentials.py index ecb8271b595..0c442b81f3f 100644 --- a/python/samba/tests/py_credentials.py +++ b/python/samba/tests/py_credentials.py @@ -18,6 +18,8 @@ from samba.tests import TestCase, delete_force import os +import ldb + import samba from samba.auth import system_session from samba.credentials import ( @@ -25,7 +27,7 @@ from samba.credentials import ( CLI_CRED_NTLMv2_AUTH, CLI_CRED_NTLM_AUTH, DONT_USE_KERBEROS) -from samba.dcerpc import netlogon, ntlmssp, srvsvc +from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc from samba.dcerpc.netlogon import ( netr_Authenticator, netr_WorkstationInformation, @@ -36,10 +38,11 @@ from samba.dsdb import ( UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD, UF_NORMAL_ACCOUNT) -from samba.ndr import ndr_pack +from samba.ndr import ndr_pack, ndr_unpack from samba.samdb import SamDB from samba import NTSTATUSError, ntstatus from samba.common import get_string +from samba.sd_utils import SDUtils import ctypes @@ -105,6 +108,280 @@ class PyCredentialsTests(TestCase): (authenticator, subsequent) = self.get_authenticator(c) self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent) + # Test using LogonGetDomainInfo to update dNSHostName to an allowed value. + def test_set_dns_hostname_valid(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + domain_hostname = self.ldb.domain_dns_name() + + new_dns_hostname = f'{self.machine_name}.{domain_hostname}' + new_dns_hostname = new_dns_hostname.encode('utf-8') + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertEqual(new_dns_hostname, got_dns_hostname) + + # Test using LogonGetDomainInfo to update dNSHostName to an allowed value, + # when we are denied the right to do so. + def test_set_dns_hostname_valid_denied(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['objectSid']) + self.assertEqual(1, len(res)) + + machine_sid = ndr_unpack(security.dom_sid, + res[0].get('objectSid', idx=0)) + + sd_utils = SDUtils(self.ldb) + + # Deny Validated Write and Write Property. + mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;' + f'{machine_sid})') + sd_utils.dacl_add_ace(self.machine_dn, mod) + + domain_hostname = self.ldb.domain_dns_name() + + new_dns_hostname = f'{self.machine_name}.{domain_hostname}' + new_dns_hostname = new_dns_hostname.encode('utf-8') + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertEqual(new_dns_hostname, got_dns_hostname) + + # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an + # invalid value, even with Validated Write. + def test_set_dns_hostname_invalid_validated_write(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['objectSid']) + self.assertEqual(1, len(res)) + + machine_sid = ndr_unpack(security.dom_sid, + res[0].get('objectSid', idx=0)) + + sd_utils = SDUtils(self.ldb) + + # Grant Validated Write. + mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;' + f'{machine_sid})') + sd_utils.dacl_add_ace(self.machine_dn, mod) + + new_dns_hostname = b'invalid' + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertIsNone(got_dns_hostname) + + # Ensure we can't use LogonGetDomainInfo to update dNSHostName to an + # invalid value, even with Write Property. + def test_set_dns_hostname_invalid_write_property(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['objectSid']) + self.assertEqual(1, len(res)) + + machine_sid = ndr_unpack(security.dom_sid, + res[0].get('objectSid', idx=0)) + + sd_utils = SDUtils(self.ldb) + + # Grant Write Property. + mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;' + f'{machine_sid})') + sd_utils.dacl_add_ace(self.machine_dn, mod) + + new_dns_hostname = b'invalid' + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertIsNone(got_dns_hostname) + + # Show we can't use LogonGetDomainInfo to set the dNSHostName to just the + # machine name. + def test_set_dns_hostname_to_machine_name(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + new_dns_hostname = self.machine_name.encode('utf-8') + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertIsNone(got_dns_hostname) + + # Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid + # suffix. + def test_set_dns_hostname_invalid_suffix(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + domain_hostname = self.ldb.domain_dns_name() + + new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}' + new_dns_hostname = new_dns_hostname.encode('utf-8') + + query = netr_WorkstationInformation() + query.os_name = lsa.String('some OS') + query.dns_hostname = new_dns_hostname + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertIsNone(got_dns_hostname) + + # Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName + # update, but other attributes are still updated. + def test_set_dns_hostname_with_flag(self): + c = self.get_netlogon_connection() + authenticator, subsequent = self.get_authenticator(c) + + domain_hostname = self.ldb.domain_dns_name() + + new_dns_hostname = f'{self.machine_name}.{domain_hostname}' + new_dns_hostname = new_dns_hostname.encode('utf-8') + + operating_system = 'some OS' + + query = netr_WorkstationInformation() + query.os_name = lsa.String(operating_system) + + query.dns_hostname = new_dns_hostname + query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE + + c.netr_LogonGetDomainInfo( + server_name=self.server, + computer_name=self.user_creds.get_workstation(), + credential=authenticator, + return_authenticator=subsequent, + level=1, + query=query) + + # Check the result. + + res = self.ldb.search(self.machine_dn, + scope=ldb.SCOPE_BASE, + attrs=['dNSHostName', + 'operatingSystem']) + self.assertEqual(1, len(res)) + + got_dns_hostname = res[0].get('dNSHostName', idx=0) + self.assertIsNone(got_dns_hostname) + + got_os = res[0].get('operatingSystem', idx=0) + self.assertEqual(operating_system.encode('utf-8'), got_os) + def test_SamLogonEx(self): c = self.get_netlogon_connection() |