#!/usr/bin/env python3 # Unix SMB/CIFS implementation. # Copyright (C) Stefan Metzmacher 2020 # Copyright (C) Catalyst.Net Ltd 2023 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import os sys.path.insert(0, 'bin/python') os.environ['PYTHONUNBUFFERED'] = '1' import random import ldb from samba import dsdb, ntstatus from samba.dcerpc import netlogon, security from samba.ndr import ndr_pack import samba.tests.krb5.kcrypto as kcrypto from samba.tests.krb5.kdc_base_test import GroupType from samba.tests.krb5.kdc_tgs_tests import KdcTgsBaseTests from samba.tests.krb5.rfc4120_constants import ( FX_FAST_ARMOR_AP_REQUEST, KDC_ERR_BADOPTION, KDC_ERR_GENERIC, KDC_ERR_NEVER_VALID, KDC_ERR_POLICY, NT_PRINCIPAL, NT_SRV_INST, ) import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 global_asn1_print = False global_hexdump = False HRES_SEC_E_INVALID_TOKEN = 0x80090308 HRES_SEC_E_LOGON_DENIED = 0x8009030C class AuthnPolicyTests(KdcTgsBaseTests): @classmethod def setUpClass(cls): super().setUpClass() cls._max_ticket_life = None cls._max_renew_life = None def setUp(self): super().setUp() self.do_asn1_print = global_asn1_print self.do_hexdump = global_hexdump def get_max_ticket_life(self): if self._max_ticket_life is None: self._fetch_default_lifetimes() return self._max_ticket_life def get_max_renew_life(self): if self._max_renew_life is None: self._fetch_default_lifetimes() return self._max_renew_life def _fetch_default_lifetimes(self): samdb = self.get_samdb() domain_policy_dn = samdb.get_default_basedn() domain_policy_dn.add_child('CN=Default Domain Policy,CN=System') res = samdb.search(domain_policy_dn, scope=ldb.SCOPE_BASE, attrs=['maxTicketAge', 'maxRenewAge']) self.assertEqual(1, len(res)) max_ticket_age = res[0].get('maxTicketAge', idx=0) max_renew_age = res[0].get('maxRenewAge', idx=0) if max_ticket_age is not None: max_ticket_age = int(max_ticket_age.decode('utf-8')) else: max_ticket_age = 10 if max_renew_age is not None: max_renew_age = int(max_renew_age.decode('utf-8')) else: max_renew_age = 7 type(self)._max_ticket_life = max_ticket_age * 60 * 60 type(self)._max_renew_life = max_renew_age * 24 * 60 * 60 # Get account credentials for testing. def _get_creds(self, account_type=KdcTgsBaseTests.AccountType.USER, member_of=None, protected=False, assigned_policy=None, assigned_silo=None, ntlm=False, spn=None, allowed_rodc=None, cached=True): opts = { 'kerberos_enabled': not ntlm, 'spn': spn, } members = () if protected: samdb = self.get_samdb() protected_users_group = (f'') members += (protected_users_group,) if member_of is not None: members += (member_of,) if assigned_policy is not None: opts['assigned_policy'] = str(assigned_policy) if assigned_silo is not None: opts['assigned_silo'] = str(assigned_silo) if allowed_rodc: opts['allowed_replication_mock'] = True opts['revealed_to_mock_rodc'] = True if members: opts['member_of'] = members return self.get_cached_creds(account_type=account_type, opts=opts, use_cache=cached) def test_authn_policy_tgt_lifetime_user(self): # Create an authentication policy with certain TGT lifetimes set. user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=user_life, expected_renew_life=user_life) def test_authn_policy_tgt_lifetime_computer(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) # Create a computer account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the computer lifetime set in the # policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=computer_life, expected_renew_life=computer_life) def test_authn_policy_tgt_lifetime_service(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the service lifetime set in the # policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=service_life, expected_renew_life=service_life) def test_authn_silo_tgt_lifetime_user(self): # Create an authentication policy with certain TGT lifetimes set. user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) # Create a second policy with different lifetimes, so we can verify the # correct policy is enforced. wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), computer_policy=str(wrong_policy), service_policy=str(wrong_policy), enforced=True) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=user_life, expected_renew_life=user_life) def test_authn_silo_tgt_lifetime_computer(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(wrong_policy), computer_policy=str(policy), service_policy=str(wrong_policy), enforced=True) # Create a computer account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the computer to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the computer lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=computer_life, expected_renew_life=computer_life) def test_authn_silo_tgt_lifetime_service(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(wrong_policy), computer_policy=str(wrong_policy), service_policy=str(policy), enforced=True) # Create a managed service account assigned to the silo. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the managed service account to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the service lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=service_life, expected_renew_life=service_life) # Test that an authentication silo takes priority over a policy assigned # directly. def test_authn_silo_and_policy_tgt_lifetime_user(self): # Create an authentication policy with certain TGT lifetimes set. user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) # Create a second policy with different lifetimes, so we can verify the # correct policy is enforced. wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), computer_policy=str(wrong_policy), service_policy=str(wrong_policy), enforced=True) # Create a user account assigned to the silo, and also to a policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=wrong_policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=user_life, expected_renew_life=user_life) def test_authn_silo_and_policy_tgt_lifetime_computer(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(wrong_policy), computer_policy=str(policy), service_policy=str(wrong_policy), enforced=True) # Create a computer account assigned to the silo, and also to a policy. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_silo=silo, assigned_policy=wrong_policy) client_dn_str = str(client_creds.get_dn()) # Add the computer to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the computer lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=computer_life, expected_renew_life=computer_life) def test_authn_silo_and_policy_tgt_lifetime_service(self): user_life = 111 computer_life = 222 service_life = 333 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life, computer_tgt_lifetime=computer_life, service_tgt_lifetime=service_life) wrong_policy_id = self.get_new_username() wrong_policy = self.create_authn_policy(wrong_policy_id, enforced=True, user_tgt_lifetime=444, computer_tgt_lifetime=555, service_tgt_lifetime=666) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(wrong_policy), computer_policy=str(wrong_policy), service_policy=str(policy), enforced=True) # Create a managed service account assigned to the silo, and also to a # policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_silo=silo, assigned_policy=wrong_policy) client_dn_str = str(client_creds.get_dn()) # Add the managed service account to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the service lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=service_life, expected_renew_life=service_life) def test_authn_policy_tgt_lifetime_max(self): # Create an authentication policy with the maximum allowable TGT # lifetime set. INT64_MAX = 0x7fff_ffff_ffff_ffff max_lifetime = INT64_MAX // 10_000_000 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=max_lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a ‘till’ time far in the # future, and assert that the actual lifetime is the maximum # allowed by the Default Domain policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_lifetime) def test_authn_policy_tgt_lifetime_min(self): # Create an authentication policy with the minimum allowable TGT # lifetime set. INT64_MIN = -0x8000_0000_0000_0000 min_lifetime = round(INT64_MIN / 10_000_000) policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=min_lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours. The request # should fail with a NEVER_VALID error. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours self._get_tgt(client_creds, till=till, expected_error=KDC_ERR_NEVER_VALID, expect_status=True, expected_status=ntstatus.NT_STATUS_TIME_DIFFERENCE_AT_DC) def test_authn_policy_tgt_lifetime_zero(self): # Create an authentication policy with the TGT lifetime set to zero. lifetime = 0 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum # allowed by the Default Domain Policy till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_policy_tgt_lifetime_one_second(self): # Create an authentication policy with the TGT lifetime set to one # second. lifetime = 1 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_policy_tgt_lifetime_kpasswd_lifetime(self): # Create an authentication policy with the TGT lifetime set to two # minutes (the lifetime of a kpasswd ticket). lifetime = 2 * 60 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_policy_tgt_lifetime_short_protected(self): # Create an authentication policy with a short TGT lifetime set. lifetime = 111 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy, belonging to the # Protected Users group. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_policy_tgt_lifetime_long_protected(self): # Create an authentication policy with a long TGT lifetime set. This # exceeds the lifetime of four hours enforced by Protected Users. lifetime = 6 * 60 * 60 # 6 hours policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy, belonging to the # Protected Users group. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of eight hours, and assert # that the actual lifetime matches the user lifetime set in the policy, # taking precedence over the lifetime enforced by Protected Users. till = self.get_KerberosTime(offset=8 * 60 * 60) # 8 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_policy_tgt_lifetime_zero_protected(self): # Create an authentication policy with the TGT lifetime set to zero. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=0) # Create a user account with the assigned policy, belonging to the # Protected Users group. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of six hours, and assert # that the actual lifetime is the four hours enforced by Protected # Users. till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=4 * 60 * 60, expected_renew_life=4 * 60 * 60) def test_authn_policy_tgt_lifetime_none_protected(self): # Create an authentication policy with no TGT lifetime set. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True) # Create a user account with the assigned policy, belonging to the # Protected Users group. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of six hours, and assert # that the actual lifetime is the four hours enforced by Protected # Users. till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=4 * 60 * 60, expected_renew_life=4 * 60 * 60) def test_authn_policy_tgt_lifetime_unenforced_protected(self): # Create an unenforced authentication policy with a TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy, belonging to the # Protected Users group. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, assigned_policy=policy) # Request a Kerberos ticket with a lifetime of six hours, and assert # that the actual lifetime is the four hours enforced by Protected # Users. till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=4 * 60 * 60, expected_renew_life=4 * 60 * 60) def test_authn_policy_not_enforced(self): # Create an authentication policy with the TGT lifetime set. The policy # is not enforced. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_policy_unenforced(self): # Create an authentication policy with the TGT lifetime set. The policy # is set to be unenforced. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, user_tgt_lifetime=lifetime) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_not_enforced(self): # Create an authentication policy with the TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policy. The silo is # not enforced. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy)) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_unenforced(self): # Create an authentication policy with the TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policy. The silo is # set to be unenforced. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=False) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_not_enforced_policy(self): # Create an authentication policy with the TGT lifetime set. The policy # is not enforced. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=True) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours. Despite the # fact that the policy is unenforced, the actual lifetime matches the # user lifetime set in the appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_silo_unenforced_policy(self): # Create an authentication policy with the TGT lifetime set. The policy # is set to be unenforced. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=True) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours. Despite the # fact that the policy is unenforced, the actual lifetime matches the # user lifetime set in the appropriate policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_silo_not_enforced_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. silo_lifetime = 123 silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, enforced=True, user_tgt_lifetime=silo_lifetime) # Create an authentication silo with our existing policy. The silo is # not enforced. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy)) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the silo, and also to the policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. The directly-assigned # policy is not enforced. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_unenforced_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. silo_lifetime = 123 silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, enforced=True, user_tgt_lifetime=silo_lifetime) # Create an authentication silo with our existing policy. The silo is # set to be unenforced. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy), enforced=False) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the silo, and also to the policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. The directly-assigned # policy is not enforced. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_not_enforced_policy_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. The policy # is not enforced. silo_lifetime = 123 silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, user_tgt_lifetime=silo_lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy), enforced=True) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the silo, and also to the policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours. Despite the # fact that the policy is unenforced, the actual lifetime matches the # user lifetime set in the appropriate policy. The directly-assigned # policy is not enforced. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=silo_lifetime, expected_renew_life=silo_lifetime) def test_authn_silo_unenforced_policy_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. The policy # is set to be unenforced. silo_lifetime = 123 silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, enforced=False, user_tgt_lifetime=silo_lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy), enforced=True) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the silo, and also to the policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours. Despite the # fact that the policy is unenforced, the actual lifetime matches the # user lifetime set in the appropriate policy. The directly-assigned # policy is not enforced. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=silo_lifetime, expected_renew_life=silo_lifetime) def test_authn_silo_not_a_member(self): # Create an authentication policy with the TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=True) # Create a user account assigned to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo) # Do not add the user to the silo as a member. # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_not_a_member_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. silo_lifetime = 123 silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, enforced=True, user_tgt_lifetime=silo_lifetime) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy), enforced=True) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the silo, and also to the policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) # Do not add the user to the silo as a member. # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # directly-assigned policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_silo_not_assigned(self): # Create an authentication policy with the TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=True) # Create a user account, but don’t assign it to the silo. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future. Assert that the actual lifetime is the maximum allowed by # the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_not_assigned_and_assigned_policy(self): # Create an authentication policy with the TGT lifetime set. lifetime = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create an authentication silo with our existing policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(policy), enforced=True) # Create a second policy with a different lifetime, so we can verify # the correct policy is enforced. lifetime = 456 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=lifetime) # Create a user account assigned to the policy, but not to the silo. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # directly-assigned policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=lifetime, expected_renew_life=lifetime) def test_authn_silo_no_applicable_policy(self): # Create an authentication policy with the TGT lifetime set. user_life = 111 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life) # Create an authentication silo containing no policies. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, enforced=True) # Create a user account assigned to the silo, and also to a policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future, and assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_authn_silo_no_tgt_lifetime(self): # Create an authentication policy with no TGT lifetime set. silo_policy_id = self.get_new_username() silo_policy = self.create_authn_policy(silo_policy_id, enforced=True) # Create a second policy with a lifetime set, so we can verify the # correct policy is enforced. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=456) # Create an authentication silo with our existing policy. silo_id = self.get_new_username() silo = self.create_authn_silo(silo_id, user_policy=str(silo_policy), enforced=True) # Create a user account assigned to the silo, and also to a policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_silo=silo, assigned_policy=policy) client_dn_str = str(client_creds.get_dn()) # Add the user to the silo as a member. self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers', expect_attr=False) # Request a Kerberos ticket with a ‘till’ time far in the # future, and assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_not_a_policy(self): # Create a user account with the assigned policy set to something that # isn’t a policy. samdb = self.get_samdb() client_creds = self._get_creds( account_type=self.AccountType.USER, assigned_policy=samdb.get_default_basedn()) # Request a Kerberos ticket with a ‘till’ time far in the # future, and assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_not_a_silo(self): # Create a user account assigned to a silo that isn’t a silo. samdb = self.get_samdb() client_creds = self._get_creds( account_type=self.AccountType.USER, assigned_silo=samdb.get_default_basedn()) # Request a Kerberos ticket with a ‘till’ time far in the # future, and assert that the actual lifetime is the maximum # allowed by the Default Domain Policy. till = '99991231235959Z' expected_lifetime = self.get_max_ticket_life() expected_renew_life = self.get_max_renew_life() tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=expected_lifetime, expected_renew_life=expected_renew_life) def test_not_a_silo_and_policy(self): # Create an authentication policy with the TGT lifetime set. user_life = 123 policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_tgt_lifetime=user_life) # Create a user account assigned to a silo that isn’t a silo, and also # to a policy. samdb = self.get_samdb() client_creds = self._get_creds( account_type=self.AccountType.USER, assigned_silo=samdb.get_default_basedn(), assigned_policy=policy) # Request a Kerberos ticket with a lifetime of two hours, and assert # that the actual lifetime matches the user lifetime set in the # directly-assigned policy. till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours tgt = self._get_tgt(client_creds, till=till) self.check_ticket_times(tgt, expected_life=user_life, expected_renew_life=user_life) def test_authn_policy_allowed_from_empty(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy with no DACL in the security # descriptor. allowed_from = 'O:SY' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed_from) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly allows the machine # account for a user. allowed = f'O:SYD:(A;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed, service_allowed_from=denied) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_deny(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly denies the machine # account for a user. allowed = 'O:SYD:(A;;CR;;;WD)' denied = f'O:SYD:(D;;CR;;;{mach_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=denied, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we get a policy error when trying to authenticate. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_service_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly allows the machine # account for a service. allowed = f'O:SYD:(A;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=denied, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_service_deny(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly denies the machine # account for a service. allowed = 'O:SYD:(A;;CR;;;WD)' denied = f'O:SYD:(D;;CR;;;{mach_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that we get a policy error when trying to authenticate. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_no_owner(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that explicitly allows the machine # account for a user. Omit the owner (O:SY) from the SDDL. allowed = 'D:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we get a generic error if the security descriptor lacks an # owner. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_GENERIC) def test_authn_policy_allowed_from_no_owner_unenforced(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an unenforced authentication policy that explicitly allows the # machine account for a user. Omit the owner (O:SY) from the SDDL. allowed = 'D:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we don’t get an error if the policy is unenforced. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_owner_self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly allows the machine # account for a user. Set the owner to the machine account. allowed = f'O:{mach_sid}D:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_owner_anon(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that explicitly allows the machine # account for a user. Set the owner to be anonymous. allowed = 'O:AND:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_no_fast(self): # Create an authentication policy that restricts authentication. allowed_from = 'O:SY' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed_from) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we cannot authenticate without using an armor ticket. self._get_tgt(client_creds, expected_error=KDC_ERR_POLICY, expect_status=True, expected_status=ntstatus.NT_STATUS_INVALID_WORKSTATION) def test_authn_policy_allowed_from_user_allow_group_not_a_member(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST and which does # not belong to the group. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we get a policy error, as the machine account does not # belong to the group. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_group_member(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST that belongs to # the group. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'member_of': (group_dn,)}) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket, since the # machine account belongs to the group. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_domain_local_group(self): samdb = self.get_samdb() # Create a new domain-local group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name, gtype=GroupType.DOMAIN_LOCAL.value) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST that belongs to # the group. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'member_of': (group_dn,)}) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that the groups in the armor ticket are expanded to include the # domain-local group. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_asserted_identity(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts with the # Authentication Authority Asserted Identity SID. allowed = ( f'O:SYD:(A;;CR;;;' f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})' ) policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_claims_valid(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts with the # Claims Valid SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_compounded_auth(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts with the # Compounded Authentication SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is denied. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_authenticated_users(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts with the # Authenticated Users SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_ntlm_authn(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create an authentication policy that allows accounts with the NTLM # Authentication SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is denied. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly allows the machine # account for a user. allowed = f'O:SYD:(A;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed, service_allowed_from=denied) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_deny_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly denies the machine # account for a user. allowed = 'O:SYD:(A;;CR;;;WD)' denied = f'O:SYD:(D;;CR;;;{mach_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=denied, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we get a policy error when trying to authenticate. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_service_allow_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly allows the machine # account for a service. allowed = f'O:SYD:(A;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=denied, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that we can authenticate using an armor ticket. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_service_deny_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create an authentication policy that explicitly denies the machine # account for a service. allowed = 'O:SYD:(A;;CR;;;WD)' denied = f'O:SYD:(D;;CR;;;{mach_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that we get a policy error when trying to authenticate. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_group_not_a_member_from_rodc(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST and which does # not belong to the group. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we get a policy error, as the machine account does not # belong to the group. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_group_member_from_rodc(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST that belongs to # the group. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'member_of': (group_dn,), 'allowed_replication_mock': True, 'revealed_to_mock_rodc': True}) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that we can authenticate using an armor ticket, since the # machine account belongs to the group. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_domain_local_group_from_rodc(self): samdb = self.get_samdb() # Create a new domain-local group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name, gtype=GroupType.DOMAIN_LOCAL.value) group_sid = self.get_objectSid(samdb, group_dn) # Create a machine account with which to perform FAST that belongs to # the group. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'member_of': (group_dn,), 'allowed_replication_mock': True, 'revealed_to_mock_rodc': True}) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that the groups in the armor ticket are expanded to include the # domain-local group. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_asserted_identity_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts with the # Authentication Authority Asserted Identity SID. allowed = ( f'O:SYD:(A;;CR;;;' f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})' ) policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_claims_valid_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts with the # Claims Valid SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_compounded_authn_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts with the # Compounded Authentication SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is denied. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_allow_authenticated_users_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts with the # Authenticated Users SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_from_user_allow_ntlm_authn_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds)) # Create an authentication policy that allows accounts with the NTLM # Authentication SID. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy) # Show that authentication is denied. self._get_tgt(client_creds, armor_tgt=mach_tgt, expected_error=KDC_ERR_POLICY) def test_authn_policy_allowed_from_user_deny_user(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create a user account. client_creds = self.get_cached_creds(account_type=self.AccountType.USER, use_cache=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) # Create an authentication policy that explicitly allows the machine # account for a user, while denying the user account itself. allowed = f'O:SYD:(A;;CR;;;{mach_sid})(D;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed, service_allowed_from=denied) # Assign the policy to the user account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that authentication is allowed. self._get_tgt(client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_empty(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy with no DACL in the security # descriptor. allowed_to = 'O:SY' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed_to) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that authentication is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_deny(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_computer_allow_but_deny_mach(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket, while # explicitly denying the machine account. allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Despite the documentation’s claims that the machine account is also # access-checked, obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_mach(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) mach_sid = self.get_objectSid(samdb, mach_creds.get_dn()) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the machine account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{mach_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_no_fast(self): samdb = self.get_samdb() # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed without an armor TGT. self._tgs_req(tgt, 0, client_creds, target_creds) def test_authn_policy_denied_no_fast(self): samdb = self.get_samdb() # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly disallows the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is not allowed. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, expect_edata=self.expect_padata_outer, expect_status=True, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_computer_allow_asserted_identity(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts with the # Authentication Authority Asserted Identity SID to obtain a service # ticket. allowed = ( f'O:SYD:(A;;CR;;;' f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})' ) denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_claims_valid(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts with the Claims # Valid SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_compounded_auth(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts with the # Compounded Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_computer_allow_authenticated_users(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts with the # Authenticated Users SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_ntlm_authn(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts with the NTLM # Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_no_owner(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. Omit # the owner (O:SY) from the SDDL. allowed = f'D:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req(tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER, check_patypes=False) def test_authn_policy_allowed_to_no_owner_unenforced(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an unenforced authentication policy that applies to a computer # and explicitly allows the user account to obtain a service # ticket. Omit the owner (O:SY) from the SDDL. allowed = f'D:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_owner_self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. Set # the owner to the user account. allowed = f'O:{client_sid}D:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_owner_anon(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. Set # the owner to be anonymous. allowed = f'O:AND:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a user and explicitly # allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=denied) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_deny(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a user and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_service_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_service_deny(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a managed service and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=allowed, service_allowed_to=denied) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_user_allow_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a user and explicitly # allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=denied) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_deny_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a user and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_computer_allow_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_deny_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,) def test_authn_policy_allowed_to_service_allow_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_service_deny_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that applies to a managed service and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=allowed, service_allowed_to=denied) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_user_allow_group_not_a_member(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account which does not belong to the group. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that we get a policy error, as the user account does not belong # to the group. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_user_allow_group_member(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self.get_cached_creds( account_type=self.AccountType.USER, opts={'member_of': (group_dn,)}) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that we can get a service ticket, since the user account belongs # to the group. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_allow_domain_local_group(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new domain-local group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name, gtype=GroupType.DOMAIN_LOCAL.value) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self.get_cached_creds( account_type=self.AccountType.USER, opts={'member_of': (group_dn,)}) tgt = self.get_tgt(client_creds) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that the groups in the TGT are expanded to include the # domain-local group. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_asserted_identity_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts with the # Authentication Authority Asserted Identity SID to obtain a service # ticket. allowed = ( f'O:SYD:(A;;CR;;;' f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})' ) denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_claims_valid_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts with the Claims # Valid SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is not allowed. self._tgs_req(tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_compounded_authn_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts with the # Compounded Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_computer_allow_authenticated_users_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts with the # Authenticated Users SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_ntlm_authn_from_rodc(self): # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts with the NTLM # Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that obtaining a service ticket is denied. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_user_allow_group_not_a_member_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account which does not belong to the group. client_creds = self._get_creds(account_type=self.AccountType.USER, allowed_rodc=True) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that we get a policy error, as the user account does not belong # to the group. self._tgs_req( tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_authn_policy_allowed_to_user_allow_group_member_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self.get_cached_creds( account_type=self.AccountType.USER, opts={'member_of': (group_dn,), 'allowed_replication_mock': True, 'revealed_to_mock_rodc': True}) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that we can get a service ticket, since the user account belongs # to the group. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_allow_domain_local_group_from_rodc(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a new domain-local group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name, gtype=GroupType.DOMAIN_LOCAL.value) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self.get_cached_creds( account_type=self.AccountType.USER, opts={'member_of': (group_dn,), 'allowed_replication_mock': True, 'revealed_to_mock_rodc': True}) # Modify the TGT to be issued by an RODC. tgt = self.issued_by_rodc(self.get_tgt(client_creds)) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed) # Create a user account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, spn='host/{account}') # Show that the groups in the TGT are expanded to include the # domain-local group. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_to_self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a computer account. client_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that obtaining a service ticket to ourselves is allowed. self._tgs_req(tgt, 0, client_creds, client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_deny_to_self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a computer account. client_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that obtaining a service ticket to ourselves is allowed, despite # the policy disallowing it. self._tgs_req(tgt, 0, client_creds, client_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_allow_to_self_with_self(self): samdb = self.get_samdb() # Create a computer account. client_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, use_cache=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that obtaining a service ticket to ourselves armored with our # own TGT is allowed. self._tgs_req(tgt, 0, client_creds, client_creds, armor_tgt=tgt) def test_authn_policy_allowed_to_computer_deny_to_self_with_self(self): samdb = self.get_samdb() # Create a computer account. client_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, use_cache=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly denies the account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that obtaining a service ticket to ourselves armored with our # own TGT is allowed, despite the policy’s disallowing it. self._tgs_req(tgt, 0, client_creds, client_creds, armor_tgt=tgt) def test_authn_policy_allowed_to_user_allow_s4u2self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_cname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=[client_creds.get_username()]) client_realm = client_creds.get_realm() client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) target_tgt = self.get_tgt(target_creds) def generate_s4u2self_padata(_kdc_exchange_dict, _callback_dict, req_body): padata = self.PA_S4U2Self_create( name=client_cname, realm=client_realm, tgt_session_key=target_tgt.session_key, ctype=None) return [padata], req_body # Show that obtaining a service ticket with S4U2Self is allowed. self._tgs_req(target_tgt, 0, target_creds, target_creds, expected_cname=client_cname, generate_fast_padata_fn=generate_s4u2self_padata, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_deny_s4u2self(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_cname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=[client_creds.get_username()]) client_realm = client_creds.get_realm() client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) target_tgt = self.get_tgt(target_creds) def generate_s4u2self_padata(_kdc_exchange_dict, _callback_dict, req_body): padata = self.PA_S4U2Self_create( name=client_cname, realm=client_realm, tgt_session_key=target_tgt.session_key, ctype=None) return [padata], req_body # Show that obtaining a service ticket with S4U2Self is allowed, # despite the policy. self._tgs_req(target_tgt, 0, target_creds, target_creds, expected_cname=client_cname, generate_fast_padata_fn=generate_s4u2self_padata, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_user_allow_constrained_delegation(self): samdb = self.get_samdb() client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a target account. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) target_spn = target_creds.get_spn() service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={ 'delegation_to_spn': target_spn, }) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Create an authentication policy that applies to a computer and # explicitly allows the service account to obtain a service ticket, # while denying the user. allowed = f'O:SYD:(A;;CR;;;{service_sid})(D;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the target account. self.add_attribute(samdb, str(target_creds.get_dn()), 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) target_etypes = target_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket with constrained delegation is # allowed. self._tgs_req(service_tgt, 0, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expected_sid=client_sid, expected_supported_etypes=target_etypes, expected_proxy_target=target_spn, expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_deny_constrained_delegation(self): samdb = self.get_samdb() client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a target account. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) target_spn = target_creds.get_spn() service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={ 'delegation_to_spn': target_spn, }) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Create an authentication policy that applies to a computer and # explicitly denies the service account to obtain a service ticket, # while allowing the user. denied = f'O:SYD:(D;;CR;;;{service_sid})(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=denied) # Assign the policy to the target account. self.add_attribute(samdb, str(target_creds.get_dn()), 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) # Show that obtaining a service ticket with constrained delegation is # not allowed. self._tgs_req( service_tgt, KDC_ERR_POLICY, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_user_allow_constrained_delegation_wrong_sname(self): client_creds = self.get_cached_creds( account_type=self.AccountType.USER, use_cache=False) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a target account. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}) target_spn = target_creds.get_spn() service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'delegation_to_spn': target_spn}) service_tgt = self.get_tgt(service_creds) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags, fresh=True) # Change the ‘sname’ of the ticket to an incorrect value. client_service_tkt.set_sname(self.get_krbtgt_sname()) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) # Show that obtaining a service ticket with constrained delegation # fails if the sname doesn’t match. self._tgs_req(service_tgt, KDC_ERR_BADOPTION, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expect_edata=self.expect_padata_outer, check_patypes=False) def test_authn_policy_allowed_to_user_allow_rbcd(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Create an authentication policy that applies to a computer and # explicitly allows the service account to obtain a service ticket, # while denying the user. allowed = f'O:SYD:(A;;CR;;;{service_sid})(D;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a target account with the assigned policy. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={ 'assigned_policy': str(policy), 'delegation_from_dn': str(service_creds.get_dn()), }) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) target_etypes = target_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket with RBCD is allowed. self._tgs_req(service_tgt, 0, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expected_sid=client_sid, expected_supported_etypes=target_etypes, expected_proxy_target=target_creds.get_spn(), expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_deny_rbcd(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Create an authentication policy that applies to a computer and # explicitly denies the service account to obtain a service ticket, # while allowing the user. denied = f'O:SYD:(D;;CR;;;{service_sid})(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=denied) # Create a target account with the assigned policy. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={ 'assigned_policy': str(policy), 'delegation_from_dn': str(service_creds.get_dn()), }) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) # Show that obtaining a service ticket with RBCD is not allowed. self._tgs_req(service_tgt, KDC_ERR_POLICY, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expect_edata=self.expect_padata_outer, check_patypes=False) def test_authn_policy_allowed_to_user_allow_rbcd_wrong_sname(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER, use_cache=False) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}) service_tgt = self.get_tgt(service_creds) # Create a target account with the assigned policy. target_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={ 'delegation_from_dn': str(service_creds.get_dn()), }) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags, fresh=True) # Change the ‘sname’ of the ticket to an incorrect value. client_service_tkt.set_sname(self.get_krbtgt_sname()) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( target_creds) # Show that obtaining a service ticket with RBCD fails if the sname # doesn’t match. self._tgs_req(service_tgt, KDC_ERR_BADOPTION, service_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expect_edata=self.expect_padata_outer, check_patypes=False) def test_authn_policy_allowed_to_user_allow_constrained_delegation_to_self(self): samdb = self.get_samdb() client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account. service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_spn = service_creds.get_spn() service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Allow delegation to ourselves. self.add_attribute(samdb, service_dn_str, 'msDS-AllowedToDelegateTo', service_spn) # Create an authentication policy that applies to a computer and # explicitly allows the client account to obtain a service ticket, # while denying the service. allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) target_etypes = service_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket to ourselves with constrained # delegation is allowed. self._tgs_req(service_tgt, 0, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expected_sid=client_sid, expected_supported_etypes=target_etypes, expected_proxy_target=service_spn, expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_deny_constrained_delegation_to_self(self): samdb = self.get_samdb() client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account. service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_spn = service_creds.get_spn() service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Allow delegation to ourselves. self.add_attribute(samdb, service_dn_str, 'msDS-AllowedToDelegateTo', service_spn) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create an authentication policy that applies to a computer and # explicitly denies the client account to obtain a service ticket, # while allowing the service. allowed = f'O:SYD:(D;;CR;;;{client_sid})(A;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) target_etypes = service_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket to ourselves with constrained # delegation is allowed, despite the policy’s disallowing it. self._tgs_req(service_tgt, 0, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expected_sid=client_sid, expected_supported_etypes=target_etypes, expected_proxy_target=service_spn, expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_not_allowed_constrained_delegation_to_self(self): samdb = self.get_samdb() client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account. service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Don’t set msDS-AllowedToDelegateTo. # Create an authentication policy that applies to a computer and # explicitly allows the client account to obtain a service ticket, # while denying the service. allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) target_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) # Show that obtaining a service ticket to ourselves with constrained # delegation is not allowed without msDS-AllowedToDelegateTo. self._tgs_req(service_tgt, KDC_ERR_BADOPTION, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, additional_ticket=client_service_tkt, decryption_key=target_decryption_key, expect_edata=self.expect_padata_outer, check_patypes=False) def test_authn_policy_allowed_to_user_allow_rbcd_to_self(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account allowed to delegate to itself. We can’t use # a more specific ACE containing the account’s SID (obtained # post-creation) as Samba (unlike Windows) won’t let us modify # msDS-AllowedToActOnBehalfOfOtherIdentity without being System. domain_sid = security.dom_sid(samdb.get_domain_sid()) security_descriptor = security.descriptor.from_sddl( 'O:BAD:(A;;CR;;;WD)', domain_sid) service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'delegation_from_dn': ndr_pack(security_descriptor)}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Create an authentication policy that applies to a computer and # explicitly allows the client account to obtain a service ticket, # while denying the service. allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) service_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) service_etypes = service_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket to ourselves with RBCD is # allowed. self._tgs_req(service_tgt, 0, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=service_decryption_key, expected_sid=client_sid, expected_supported_etypes=service_etypes, expected_proxy_target=service_creds.get_spn(), expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_deny_rbcd_to_self(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_username = client_creds.get_username() client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=[client_username]) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account allowed to delegate to itself. We can’t use # a more specific ACE containing the account’s SID (obtained # post-creation) as Samba (unlike Windows) won’t let us modify # msDS-AllowedToActOnBehalfOfOtherIdentity without being System. domain_sid = security.dom_sid(samdb.get_domain_sid()) security_descriptor = security.descriptor.from_sddl( 'O:BAD:(A;;CR;;;WD)', domain_sid) service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'delegation_from_dn': ndr_pack(security_descriptor)}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create an authentication policy that applies to a computer and # explicitly denies the client account to obtain a service ticket, # while allowing the service. allowed = f'O:SYD:(D;;CR;;;{client_sid})(A;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) service_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) service_etypes = service_creds.tgs_supported_enctypes service_name = service_creds.get_username() if service_name[-1] == '$': service_name = service_name[:-1] expected_transited_services = [ f'host/{service_name}@{service_creds.get_realm()}' ] # Show that obtaining a service ticket to ourselves with RBCD is # allowed, despite the policy’s disallowing it. self._tgs_req(service_tgt, 0, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD expected_cname=client_cname, expected_account_name=client_username, additional_ticket=client_service_tkt, decryption_key=service_decryption_key, expected_sid=client_sid, expected_supported_etypes=service_etypes, expected_proxy_target=service_creds.get_spn(), expected_transited_services=expected_transited_services) def test_authn_policy_allowed_to_user_not_allowed_rbcd_to_self(self): samdb = self.get_samdb() functional_level = self.get_domain_functional_level(samdb) if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008: self.skipTest('RBCD requires FL2008') client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) client_tkt_options = 'forwardable' expected_flags = krb5_asn1.TicketFlags(client_tkt_options) client_tgt = self.get_tgt(client_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a service account. service_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER, opts={'id': 1}, use_cache=False) service_dn_str = str(service_creds.get_dn()) service_sid = self.get_objectSid(samdb, service_creds.get_dn()) service_tgt = self.get_tgt(service_creds) # Don’t set msDS-AllowedToActOnBehalfOfOtherIdentity. # Create an authentication policy that applies to a computer and # explicitly allows the client account to obtain a service ticket, # while denying the service. allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Assign the policy to the service account. self.add_attribute(samdb, service_dn_str, 'msDS-AssignedAuthNPolicy', str(policy)) client_service_tkt = self.get_service_ticket( client_tgt, service_creds, kdc_options=client_tkt_options, expected_flags=expected_flags) kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt')) service_decryption_key = self.TicketDecryptionKey_from_creds( service_creds) # Show that obtaining a service ticket to ourselves with RBCD # is not allowed without msDS-AllowedToActOnBehalfOfOtherIdentity. self._tgs_req(service_tgt, KDC_ERR_BADOPTION, service_creds, service_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, pac_options='1001', # supports claims, RBCD additional_ticket=client_service_tkt, decryption_key=service_decryption_key, expect_edata=self.expect_padata_outer, check_patypes=False) def test_authn_policy_allowed_to_computer_allow_user2user(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) client_creds = self.get_mach_creds() client_sid = self.get_objectSid(samdb, client_creds.get_dn()) client_tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) target_tgt = self._get_tgt(target_creds) kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey')) # Show that obtaining a service ticket with user-to-user is allowed. self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, additional_ticket=target_tgt) def test_authn_policy_allowed_to_computer_deny_user2user(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) client_creds = self.get_mach_creds() client_sid = self.get_objectSid(samdb, client_creds.get_dn()) client_tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) target_tgt = self._get_tgt(target_creds) kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey')) # Show that obtaining a service ticket with user-to-user is not # allowed. self._tgs_req( client_tgt, KDC_ERR_POLICY, client_creds, target_creds, armor_tgt=mach_tgt, kdc_options=kdc_options, additional_ticket=target_tgt, expect_edata=self.expect_padata_outer, # We aren’t particular about whether or not we get an NTSTATUS. expect_status=None, expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED, check_patypes=False) def test_authn_policy_allowed_to_user_derived_class_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a user and explicitly # allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=denied) # Create a schema class derived from ‘user’. class_id = random.randint(0, 100000000) user_class_cn = f'my-User-Class-{class_id}' user_class = user_class_cn.replace('-', '') class_dn = samdb.get_schema_basedn() class_dn.add_child(f'CN={user_class_cn}') governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}' samdb.add({ 'dn': class_dn, 'objectClass': 'classSchema', 'subClassOf': 'user', 'governsId': governs_id, 'lDAPDisplayName': user_class, }) # Create an account derived from ‘user’ with the assigned policy. target_name = self.get_new_username() target_creds, target_dn = self.create_account( samdb, target_name, account_type=self.AccountType.USER, spn='host/{account}', additional_details={ 'msDS-AssignedAuthNPolicy': str(policy), 'objectClass': user_class, }) keys = self.get_keys(target_creds) self.creds_set_keys(target_creds, keys) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_computer_derived_class_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a schema class derived from ‘computer’. class_id = random.randint(0, 100000000) computer_class_cn = f'my-Computer-Class-{class_id}' computer_class = computer_class_cn.replace('-', '') class_dn = samdb.get_schema_basedn() class_dn.add_child(f'CN={computer_class_cn}') governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}' samdb.add({ 'dn': class_dn, 'objectClass': 'classSchema', 'subClassOf': 'computer', 'governsId': governs_id, 'lDAPDisplayName': computer_class, }) # Create an account derived from ‘computer’ with the assigned policy. target_name = self.get_new_username() target_creds, target_dn = self.create_account( samdb, target_name, account_type=self.AccountType.COMPUTER, spn=f'host/{target_name}', additional_details={ 'msDS-AssignedAuthNPolicy': str(policy), 'objectClass': computer_class, }) keys = self.get_keys(target_creds) self.creds_set_keys(target_creds, keys) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_allowed_to_service_derived_class_allow(self): samdb = self.get_samdb() # Create a machine account with which to perform FAST. mach_creds = self.get_cached_creds( account_type=self.AccountType.COMPUTER) mach_tgt = self.get_tgt(mach_creds) # Create a user account. client_creds = self.get_cached_creds( account_type=self.AccountType.USER) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) tgt = self.get_tgt(client_creds) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Create a schema class derived from ‘msDS-ManagedServiceAccount’. class_id = random.randint(0, 100000000) service_class_cn = f'my-Managed-Service-Class-{class_id}' service_class = service_class_cn.replace('-', '') class_dn = samdb.get_schema_basedn() class_dn.add_child(f'CN={service_class_cn}') governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}' samdb.add({ 'dn': class_dn, 'objectClass': 'classSchema', 'subClassOf': 'msDS-ManagedServiceAccount', 'governsId': governs_id, 'lDAPDisplayName': service_class, }) # Create an account derived from ‘msDS-ManagedServiceAccount’ with the # assigned policy. target_name = self.get_new_username() target_creds, target_dn = self.create_account( samdb, target_name, account_type=self.AccountType.MANAGED_SERVICE, spn=f'host/{target_name}', additional_details={ 'msDS-AssignedAuthNPolicy': str(policy), 'objectClass': service_class, }) # Show that obtaining a service ticket is allowed. self._tgs_req(tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt) def test_authn_policy_ntlm_allow_user(self): # Create an authentication policy allowing NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=allowed, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that NTLM authentication succeeds. self._connect(client_creds, simple_bind=False) def test_authn_policy_ntlm_deny_user(self): # Create an authentication policy denying NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=allowed, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that NTLM authentication fails. self._connect(client_creds, simple_bind=False, expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}') def test_authn_policy_ntlm_computer(self): # Create an authentication policy denying NTLM authentication. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=denied, service_allowed_ntlm=False, service_allowed_from=denied) # Create a computer account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy, ntlm=True) # Show that NTLM authentication succeeds. self._connect(client_creds, simple_bind=False) def test_authn_policy_ntlm_allow_service(self): # Create an authentication policy allowing NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=allowed, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that NTLM authentication succeeds. self._connect(client_creds, simple_bind=False) def test_authn_policy_ntlm_deny_service(self): # Create an authentication policy denying NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=allowed, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that NTLM authentication fails. self._connect(client_creds, simple_bind=False, expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}') def test_authn_policy_ntlm_deny_no_device_restrictions(self): # Create an authentication policy denying NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, service_allowed_ntlm=True) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that without AllowedToAuthenticateFrom set in the policy, NTLM # authentication succeeds. self._connect(client_creds, simple_bind=False) def test_authn_policy_simple_bind_allow_user(self): # Create an authentication policy allowing NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=allowed, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that a simple bind succeeds. self._connect(client_creds, simple_bind=True) def test_authn_policy_simple_bind_deny_user(self): # Create an authentication policy denying NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=allowed, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that a simple bind fails. self._connect(client_creds, simple_bind=True, expect_error=f'{HRES_SEC_E_INVALID_TOKEN:08X}') def test_authn_policy_simple_bind_deny_no_device_restrictions(self): # Create an authentication policy denying NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, service_allowed_ntlm=True) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that without AllowedToAuthenticateFrom set in the policy, a # simple bind succeeds. self._connect(client_creds, simple_bind=True) def test_authn_policy_samr_pwd_change_allow_service_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # managed service accounts. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that a SAMR password change is allowed. self._test_samr_change_password(client_creds, expect_error=None) def test_authn_policy_samr_pwd_change_allow_service_not_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # managed service accounts. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that a SAMR password change is allowed. self._test_samr_change_password(client_creds, expect_error=None) def test_authn_policy_samr_pwd_change_allow_service_no_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # managed service accounts. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that a SAMR password change is allowed. self._test_samr_change_password(client_creds, expect_error=None) def test_authn_policy_samr_pwd_change_deny_service_allowed_from(self): # Create an authentication policy denying NTLM authentication for # managed service accounts. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that the SAMR connection fails. self._test_samr_change_password( client_creds, expect_error=None, connect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samr_pwd_change_deny_service_not_allowed_from(self): # Create an authentication policy denying NTLM authentication for # managed service accounts. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that the SAMR connection fails. self._test_samr_change_password( client_creds, expect_error=None, connect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samr_pwd_change_deny_service_no_allowed_from(self): # Create an authentication policy denying NTLM authentication for # managed service accounts. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True) # Show that a SAMR password change is allowed. self._test_samr_change_password(client_creds, expect_error=None) def test_authn_policy_samlogon_allow_user(self): # Create an authentication policy allowing NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=allowed, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t # state it, AllowedNTLMNetworkAuthentication applies to interactive # logons too. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_deny_user(self): # Create an authentication policy denying NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=allowed, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_computer(self): # Create an authentication policy denying NTLM authentication. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=denied, service_allowed_ntlm=False, service_allowed_from=denied) # Create a computer account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy, ntlm=True) # Show that a network SamLogon succeeds. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_interactive_allow_user_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t # state it, AllowedNTLMNetworkAuthentication applies to interactive # logons too. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_interactive_allow_user_not_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # users. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True, user_allowed_from=denied) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t # state it, AllowedNTLMNetworkAuthentication applies to interactive # logons too. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_interactive_allow_user_no_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=True) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_interactive_deny_user_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # users. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_interactive_deny_user_not_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # users. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, user_allowed_from=denied) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_interactive_deny_user_no_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_interactive_user_allowed_from(self): # Create an authentication policy not specifying whether NTLM # authentication is allowed or not. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_user_allowed_from(self): # Create an authentication policy not specifying whether NTLM # authentication is allowed or not. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_from=allowed) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_allow_service_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_allow_service_not_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # services. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_allow_service_no_allowed_from(self): # Create an authentication policy allowing NTLM authentication for # services. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_deny_service_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_deny_service_not_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # services. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_deny_service_no_allowed_from(self): # Create an authentication policy disallowing NTLM authentication for # services. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_allow_service_allowed_from_to_self(self): # Create an authentication policy allowing NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_allow_service_not_allowed_from_to_self(self): # Create an authentication policy allowing NTLM authentication for # services. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_allow_service_no_allowed_from_to_self(self): # Create an authentication policy allowing NTLM authentication for # services. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=True) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_network_deny_service_allowed_from_to_self(self): # Create an authentication policy disallowing NTLM authentication for # services. allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=allowed) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_deny_service_not_allowed_from_to_self(self): # Create an authentication policy disallowing NTLM authentication for # services. denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False, service_allowed_from=denied) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_authn_policy_samlogon_network_deny_service_no_allowed_from_to_self(self): # Create an authentication policy disallowing NTLM authentication for # services. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, service_allowed_ntlm=False) # Create a managed service account with the assigned policy. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy, ntlm=True, cached=False) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_authn_policy_samlogon_interactive_deny_no_device_restrictions(self): # Create an authentication policy denying NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, service_allowed_ntlm=True) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that without AllowedToAuthenticateFrom set in the policy, an # interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_authn_policy_samlogon_network_deny_no_device_restrictions(self): # Create an authentication policy denying NTLM authentication for # users. policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_ntlm=False, service_allowed_ntlm=True) # Create a user account with the assigned policy. client_creds = self._get_creds(account_type=self.AccountType.USER, assigned_policy=policy, ntlm=True) # Show that without AllowedToAuthenticateFrom set in the policy, a # network SamLogon succeeds. self._test_samlogon(creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_samlogon_allowed_to_computer_allow(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_computer_deny(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_deny_protected(self): samdb = self.get_samdb() # Create a protected user account. client_creds = self._get_creds(account_type=self.AccountType.USER, protected=True, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION) def test_samlogon_allowed_to_computer_allow_asserted_identity(self): # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts with the # Authentication Authority Asserted Identity SID to obtain a service # ticket. allowed = ( f'O:SYD:(A;;CR;;;' f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})' ) denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_allow_claims_valid(self): # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts with the Claims # Valid SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_allow_compounded_auth(self): # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts with the # Compounded Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_allow_authenticated_users(self): # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts with the # Authenticated Users SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_computer_allow_ntlm_authn(self): # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts with the NTLM # Authentication SID to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_no_owner(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. Omit # the owner (O:SY) from the SDDL. allowed = f'D:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_INVALID_PARAMETER) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_INVALID_PARAMETER) def test_samlogon_allowed_to_no_owner_unenforced(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an unenforced authentication policy that applies to a computer # and explicitly allows the user account to obtain a service # ticket. Omit the owner (O:SY) from the SDDL. allowed = f'D:(A;;CR;;;{client_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=False, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_service_allow(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_service_deny(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a managed service and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=allowed, service_allowed_to=denied) # Create a managed service account with the assigned policy. target_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, assigned_policy=policy) # Show that a network SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_allow_group_not_a_member(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account which does not belong to the group. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon fails, as the user account does not # belong to the group. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) # Show that an interactive SamLogon fails, as the user account does not # belong to the group. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_allow_group_member(self): samdb = self.get_samdb() # Create a new group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self._get_creds(account_type=self.AccountType.USER, member_of=group_dn, ntlm=True) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon succeeds, since the user account belongs # to the group. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds, since the user account # belongs to the group. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_computer_allow_domain_local_group(self): samdb = self.get_samdb() # Create a new domain-local group. group_name = self.get_new_username() group_dn = self.create_group(samdb, group_name, gtype=GroupType.DOMAIN_LOCAL.value) group_sid = self.get_objectSid(samdb, group_dn) # Create a user account that belongs to the group. client_creds = self._get_creds(account_type=self.AccountType.USER, member_of=group_dn, ntlm=True) # Create an authentication policy that allows accounts belonging to the # group. allowed = f'O:SYD:(A;;CR;;;{group_sid})' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, computer_allowed_to=allowed) # Create a computer account with the assigned policy. target_creds = self._get_creds(account_type=self.AccountType.COMPUTER, assigned_policy=policy) # Show that a network SamLogon succeeds, since the user account belongs # to the group. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds, since the user account # belongs to the group. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_computer_allow_to_self(self): samdb = self.get_samdb() # Create a computer account. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, ntlm=True, cached=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_samlogon_allowed_to_computer_deny_to_self(self): samdb = self.get_samdb() # Create a computer account. client_creds = self._get_creds(account_type=self.AccountType.COMPUTER, ntlm=True, cached=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) # Create an authentication policy that applies to a computer and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=denied, service_allowed_to=allowed) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that a network SamLogon to ourselves fails, despite # authentication being allowed in the Kerberos case. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_service_allow_to_self(self): samdb = self.get_samdb() # Create a managed service account. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, ntlm=True, cached=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that a network SamLogon to ourselves succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation) def test_samlogon_allowed_to_service_deny_to_self(self): samdb = self.get_samdb() # Create a managed service account. client_creds = self._get_creds( account_type=self.AccountType.MANAGED_SERVICE, ntlm=True, cached=False) client_dn = client_creds.get_dn() client_sid = self.get_objectSid(samdb, client_dn) # Create an authentication policy that applies to a managed service and # explicitly denies the user account to obtain a service ticket. denied = f'O:SYD:(D;;CR;;;{client_sid})' allowed = 'O:SYD:(A;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=allowed, computer_allowed_to=allowed, service_allowed_to=denied) # Assign the policy to the account. self.add_attribute(samdb, str(client_dn), 'msDS-AssignedAuthNPolicy', str(policy)) # Show that a network SamLogon to ourselves fails, despite # authentication being allowed in the Kerberos case. self._test_samlogon( creds=client_creds, domain_joined_mach_creds=client_creds, logon_type=netlogon.NetlogonNetworkInformation, expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED) def test_samlogon_allowed_to_computer_derived_class_allow(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a computer and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=allowed, service_allowed_to=denied) # Create a schema class derived from ‘computer’. class_id = random.randint(0, 100000000) computer_class_cn = f'my-Computer-Class-{class_id}' computer_class = computer_class_cn.replace('-', '') class_dn = samdb.get_schema_basedn() class_dn.add_child(f'CN={computer_class_cn}') governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}' samdb.add({ 'dn': class_dn, 'objectClass': 'classSchema', 'subClassOf': 'computer', 'governsId': governs_id, 'lDAPDisplayName': computer_class, }) # Create an account derived from ‘computer’ with the assigned policy. target_name = self.get_new_username() target_creds, target_dn = self.create_account( samdb, target_name, account_type=self.AccountType.COMPUTER, spn=f'host/{target_name}', additional_details={ 'msDS-AssignedAuthNPolicy': str(policy), 'objectClass': computer_class, }) keys = self.get_keys(target_creds) self.creds_set_keys(target_creds, keys) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def test_samlogon_allowed_to_service_derived_class_allow(self): samdb = self.get_samdb() # Create a user account. client_creds = self._get_creds(account_type=self.AccountType.USER, ntlm=True) client_sid = self.get_objectSid(samdb, client_creds.get_dn()) # Create an authentication policy that applies to a managed service and # explicitly allows the user account to obtain a service ticket. allowed = f'O:SYD:(A;;CR;;;{client_sid})' denied = 'O:SYD:(D;;CR;;;WD)' policy_id = self.get_new_username() policy = self.create_authn_policy(policy_id, enforced=True, user_allowed_to=denied, computer_allowed_to=denied, service_allowed_to=allowed) # Create a schema class derived from ‘msDS-ManagedServiceAccount’. class_id = random.randint(0, 100000000) service_class_cn = f'my-Managed-Service-Class-{class_id}' service_class = service_class_cn.replace('-', '') class_dn = samdb.get_schema_basedn() class_dn.add_child(f'CN={service_class_cn}') governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}' samdb.add({ 'dn': class_dn, 'objectClass': 'classSchema', 'subClassOf': 'msDS-ManagedServiceAccount', 'governsId': governs_id, 'lDAPDisplayName': service_class, }) # Create an account derived from ‘msDS-ManagedServiceAccount’ with the # assigned policy. target_name = self.get_new_username() target_creds, target_dn = self.create_account( samdb, target_name, account_type=self.AccountType.MANAGED_SERVICE, spn=f'host/{target_name}', additional_details={ 'msDS-AssignedAuthNPolicy': str(policy), 'objectClass': service_class, }) # Show that a network SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonNetworkInformation) # Show that an interactive SamLogon succeeds. self._test_samlogon(creds=client_creds, domain_joined_mach_creds=target_creds, logon_type=netlogon.NetlogonInteractiveInformation) def check_ticket_times(self, ticket_creds, expected_life=None, expected_renew_life=None): ticket = ticket_creds.ticket_private authtime = ticket['authtime'] starttime = ticket.get('starttime', authtime) endtime = ticket['endtime'] renew_till = ticket.get('renew-till', None) starttime = self.get_EpochFromKerberosTime(starttime) if expected_life is not None: actual_end = self.get_EpochFromKerberosTime( endtime.decode('ascii')) actual_lifetime = actual_end - starttime self.assertEqual(expected_life, actual_lifetime) if renew_till is None: self.assertIsNone(expected_renew_life) else: if expected_renew_life is not None: actual_renew_till = self.get_EpochFromKerberosTime( renew_till.decode('ascii')) actual_renew_life = actual_renew_till - starttime self.assertEqual(expected_renew_life, actual_renew_life) def _get_tgt(self, creds, *, armor_tgt=None, till=None, expected_error=0, expect_status=None, expected_status=None): user_name = creds.get_username() realm = creds.get_realm() salt = creds.get_salt() cname = self.PrincipalName_create(name_type=NT_PRINCIPAL, names=user_name.split('/')) sname = self.PrincipalName_create(name_type=NT_SRV_INST, names=['krbtgt', realm]) expected_sname = self.PrincipalName_create( name_type=NT_SRV_INST, names=['krbtgt', realm.upper()]) expected_cname = cname if till is None: till = self.get_KerberosTime(offset=36000) renew_time = till krbtgt_creds = self.get_krbtgt_creds() ticket_decryption_key = ( self.TicketDecryptionKey_from_creds(krbtgt_creds)) expected_etypes = krbtgt_creds.tgs_supported_enctypes kdc_options = str(krb5_asn1.KDCOptions('renewable')) # Contrary to Microsoft’s documentation, the returned ticket is # renewable. expected_flags = krb5_asn1.TicketFlags('renewable') preauth_key = self.PasswordKey_from_creds(creds, kcrypto.Enctype.AES256) expected_realm = realm.upper() etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4 if armor_tgt is not None: authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256) armor_key = self.generate_armor_key(authenticator_subkey, armor_tgt.session_key) armor_subkey = authenticator_subkey client_challenge_key = self.generate_client_challenge_key( armor_key, preauth_key) enc_challenge_padata = self.get_challenge_pa_data( client_challenge_key) def generate_fast_padata_fn(kdc_exchange_dict, _callback_dict, req_body): return [enc_challenge_padata], req_body generate_fast_fn = self.generate_simple_fast generate_fast_armor_fn = self.generate_ap_req generate_padata_fn = None fast_armor_type = FX_FAST_ARMOR_AP_REQUEST else: ts_enc_padata = self.get_enc_timestamp_pa_data_from_key( preauth_key) def generate_padata_fn(kdc_exchange_dict, _callback_dict, req_body): return [ts_enc_padata], req_body generate_fast_fn = None generate_fast_padata_fn = None generate_fast_armor_fn = None armor_key = None armor_subkey = None fast_armor_type = None if not expected_error: check_error_fn = None check_rep_fn = self.generic_check_kdc_rep else: check_error_fn = self.generic_check_kdc_error check_rep_fn = None kdc_exchange_dict = self.as_exchange_dict( creds=creds, expected_error_mode=expected_error, expect_status=expect_status, expected_status=expected_status, expected_crealm=expected_realm, expected_cname=expected_cname, expected_srealm=expected_realm, expected_sname=expected_sname, expected_salt=salt, expected_flags=expected_flags, expected_supported_etypes=expected_etypes, generate_padata_fn=generate_padata_fn, generate_fast_padata_fn=generate_fast_padata_fn, generate_fast_fn=generate_fast_fn, generate_fast_armor_fn=generate_fast_armor_fn, fast_armor_type=fast_armor_type, check_error_fn=check_error_fn, check_rep_fn=check_rep_fn, check_kdc_private_fn=self.generic_check_kdc_private, armor_key=armor_key, armor_tgt=armor_tgt, armor_subkey=armor_subkey, kdc_options=kdc_options, preauth_key=preauth_key, ticket_decryption_key=ticket_decryption_key, # PA-DATA types are not important for these tests. check_patypes=False) rep = self._generic_kdc_exchange(kdc_exchange_dict, cname=cname, realm=realm, sname=sname, till_time=till, renew_time=renew_time, etypes=etypes) if expected_error: self.check_error_rep(rep, expected_error) return None self.check_as_reply(rep) ticket_creds = kdc_exchange_dict['rep_ticket_creds'] return ticket_creds if __name__ == '__main__': global_asn1_print = False global_hexdump = False import unittest unittest.main()