summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJoseph Sutton <josephsutton@catalyst.net.nz>2023-05-04 13:53:06 +1200
committerAndrew Bartlett <abartlet@samba.org>2023-05-18 01:03:37 +0000
commit84a7ae8e0c7730e03161d69b5ca55436cfc5b066 (patch)
tree4f90edb88846f4e85d4381fa7304a92557cc1b3f /python
parentf9b666297cbbe5d475b570a9d268df1f3fce048e (diff)
downloadsamba-84a7ae8e0c7730e03161d69b5ca55436cfc5b066.tar.gz
tests/krb5: Add tests for authentication policies
Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-xpython/samba/tests/krb5/authn_policy_tests.py6589
-rw-r--r--python/samba/tests/krb5/rfc4120_constants.py1
-rw-r--r--python/samba/tests/usage.py1
3 files changed, 6591 insertions, 0 deletions
diff --git a/python/samba/tests/krb5/authn_policy_tests.py b/python/samba/tests/krb5/authn_policy_tests.py
new file mode 100755
index 00000000000..6182388f262
--- /dev/null
+++ b/python/samba/tests/krb5/authn_policy_tests.py
@@ -0,0 +1,6589 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+# Copyright (C) Stefan Metzmacher 2020
+# Copyright (C) Catalyst.Net Ltd 2023
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+import os
+
+sys.path.insert(0, 'bin/python')
+os.environ['PYTHONUNBUFFERED'] = '1'
+
+import random
+
+import ldb
+
+from samba import dsdb, ntstatus
+from samba.dcerpc import netlogon, security
+from samba.ndr import ndr_pack
+
+import samba.tests.krb5.kcrypto as kcrypto
+from samba.tests.krb5.kdc_base_test import GroupType
+from samba.tests.krb5.kdc_tgs_tests import KdcTgsBaseTests
+from samba.tests.krb5.rfc4120_constants import (
+ FX_FAST_ARMOR_AP_REQUEST,
+ KDC_ERR_BADOPTION,
+ KDC_ERR_GENERIC,
+ KDC_ERR_NEVER_VALID,
+ KDC_ERR_POLICY,
+ NT_PRINCIPAL,
+ NT_SRV_INST,
+)
+import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
+
+global_asn1_print = False
+global_hexdump = False
+
+HRES_SEC_E_INVALID_TOKEN = 0x80090308
+HRES_SEC_E_LOGON_DENIED = 0x8009030C
+
+
+class AuthnPolicyTests(KdcTgsBaseTests):
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ cls._max_ticket_life = None
+ cls._max_renew_life = None
+
+ def setUp(self):
+ super().setUp()
+ self.do_asn1_print = global_asn1_print
+ self.do_hexdump = global_hexdump
+
+ def get_max_ticket_life(self):
+ if self._max_ticket_life is None:
+ self._fetch_default_lifetimes()
+
+ return self._max_ticket_life
+
+ def get_max_renew_life(self):
+ if self._max_renew_life is None:
+ self._fetch_default_lifetimes()
+
+ return self._max_renew_life
+
+ def _fetch_default_lifetimes(self):
+ samdb = self.get_samdb()
+
+ domain_policy_dn = samdb.get_default_basedn()
+ domain_policy_dn.add_child('CN=Default Domain Policy,CN=System')
+
+ res = samdb.search(domain_policy_dn,
+ scope=ldb.SCOPE_BASE,
+ attrs=['maxTicketAge', 'maxRenewAge'])
+ self.assertEqual(1, len(res))
+
+ max_ticket_age = res[0].get('maxTicketAge', idx=0)
+ max_renew_age = res[0].get('maxRenewAge', idx=0)
+
+ if max_ticket_age is not None:
+ max_ticket_age = int(max_ticket_age.decode('utf-8'))
+ else:
+ max_ticket_age = 10
+
+ if max_renew_age is not None:
+ max_renew_age = int(max_renew_age.decode('utf-8'))
+ else:
+ max_renew_age = 7
+
+ type(self)._max_ticket_life = max_ticket_age * 60 * 60
+ type(self)._max_renew_life = max_renew_age * 24 * 60 * 60
+
+ # Get account credentials for testing.
+ def _get_creds(self,
+ account_type=KdcTgsBaseTests.AccountType.USER,
+ member_of=None,
+ protected=False,
+ assigned_policy=None,
+ assigned_silo=None,
+ ntlm=False,
+ spn=None,
+ allowed_rodc=None,
+ cached=True):
+ opts = {
+ 'kerberos_enabled': not ntlm,
+ 'spn': spn,
+ }
+
+ members = ()
+ if protected:
+ samdb = self.get_samdb()
+ protected_users_group = (f'<SID={samdb.get_domain_sid()}-'
+ f'{security.DOMAIN_RID_PROTECTED_USERS}>')
+ members += (protected_users_group,)
+ if member_of is not None:
+ members += (member_of,)
+ if assigned_policy is not None:
+ opts['assigned_policy'] = str(assigned_policy)
+ if assigned_silo is not None:
+ opts['assigned_silo'] = str(assigned_silo)
+ if allowed_rodc:
+ opts['allowed_replication_mock'] = True
+ opts['revealed_to_mock_rodc'] = True
+
+ if members:
+ opts['member_of'] = members
+
+ return self.get_cached_creds(account_type=account_type,
+ opts=opts,
+ use_cache=cached)
+
+ def test_authn_policy_tgt_lifetime_user(self):
+ # Create an authentication policy with certain TGT lifetimes set.
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=user_life,
+ expected_renew_life=user_life)
+
+ def test_authn_policy_tgt_lifetime_computer(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ # Create a computer account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the computer lifetime set in the
+ # policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=computer_life,
+ expected_renew_life=computer_life)
+
+ def test_authn_policy_tgt_lifetime_service(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the service lifetime set in the
+ # policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=service_life,
+ expected_renew_life=service_life)
+
+ def test_authn_silo_tgt_lifetime_user(self):
+ # Create an authentication policy with certain TGT lifetimes set.
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ # Create a second policy with different lifetimes, so we can verify the
+ # correct policy is enforced.
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ computer_policy=str(wrong_policy),
+ service_policy=str(wrong_policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=user_life,
+ expected_renew_life=user_life)
+
+ def test_authn_silo_tgt_lifetime_computer(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(wrong_policy),
+ computer_policy=str(policy),
+ service_policy=str(wrong_policy),
+ enforced=True)
+
+ # Create a computer account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the computer to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the computer lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=computer_life,
+ expected_renew_life=computer_life)
+
+ def test_authn_silo_tgt_lifetime_service(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(wrong_policy),
+ computer_policy=str(wrong_policy),
+ service_policy=str(policy),
+ enforced=True)
+
+ # Create a managed service account assigned to the silo.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the managed service account to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the service lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=service_life,
+ expected_renew_life=service_life)
+
+ # Test that an authentication silo takes priority over a policy assigned
+ # directly.
+ def test_authn_silo_and_policy_tgt_lifetime_user(self):
+ # Create an authentication policy with certain TGT lifetimes set.
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ # Create a second policy with different lifetimes, so we can verify the
+ # correct policy is enforced.
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ computer_policy=str(wrong_policy),
+ service_policy=str(wrong_policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo, and also to a policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=wrong_policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=user_life,
+ expected_renew_life=user_life)
+
+ def test_authn_silo_and_policy_tgt_lifetime_computer(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(wrong_policy),
+ computer_policy=str(policy),
+ service_policy=str(wrong_policy),
+ enforced=True)
+
+ # Create a computer account assigned to the silo, and also to a policy.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_silo=silo,
+ assigned_policy=wrong_policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the computer to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the computer lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=computer_life,
+ expected_renew_life=computer_life)
+
+ def test_authn_silo_and_policy_tgt_lifetime_service(self):
+ user_life = 111
+ computer_life = 222
+ service_life = 333
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life,
+ computer_tgt_lifetime=computer_life,
+ service_tgt_lifetime=service_life)
+
+ wrong_policy_id = self.get_new_username()
+ wrong_policy = self.create_authn_policy(wrong_policy_id,
+ enforced=True,
+ user_tgt_lifetime=444,
+ computer_tgt_lifetime=555,
+ service_tgt_lifetime=666)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(wrong_policy),
+ computer_policy=str(wrong_policy),
+ service_policy=str(policy),
+ enforced=True)
+
+ # Create a managed service account assigned to the silo, and also to a
+ # policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_silo=silo,
+ assigned_policy=wrong_policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the managed service account to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the service lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=service_life,
+ expected_renew_life=service_life)
+
+ def test_authn_policy_tgt_lifetime_max(self):
+ # Create an authentication policy with the maximum allowable TGT
+ # lifetime set.
+ INT64_MAX = 0x7fff_ffff_ffff_ffff
+ max_lifetime = INT64_MAX // 10_000_000
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=max_lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future, and assert that the actual lifetime is the maximum
+ # allowed by the Default Domain policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_lifetime)
+
+ def test_authn_policy_tgt_lifetime_min(self):
+ # Create an authentication policy with the minimum allowable TGT
+ # lifetime set.
+ INT64_MIN = -0x8000_0000_0000_0000
+ min_lifetime = round(INT64_MIN / 10_000_000)
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=min_lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours. The request
+ # should fail with a NEVER_VALID error.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ self._get_tgt(client_creds, till=till,
+ expected_error=KDC_ERR_NEVER_VALID,
+ expect_status=True,
+ expected_status=ntstatus.NT_STATUS_TIME_DIFFERENCE_AT_DC)
+
+ def test_authn_policy_tgt_lifetime_zero(self):
+ # Create an authentication policy with the TGT lifetime set to zero.
+ lifetime = 0
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_policy_tgt_lifetime_one_second(self):
+ # Create an authentication policy with the TGT lifetime set to one
+ # second.
+ lifetime = 1
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_policy_tgt_lifetime_kpasswd_lifetime(self):
+ # Create an authentication policy with the TGT lifetime set to two
+ # minutes (the lifetime of a kpasswd ticket).
+ lifetime = 2 * 60
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_policy_tgt_lifetime_short_protected(self):
+ # Create an authentication policy with a short TGT lifetime set.
+ lifetime = 111
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy, belonging to the
+ # Protected Users group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_policy_tgt_lifetime_long_protected(self):
+ # Create an authentication policy with a long TGT lifetime set. This
+ # exceeds the lifetime of four hours enforced by Protected Users.
+ lifetime = 6 * 60 * 60 # 6 hours
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy, belonging to the
+ # Protected Users group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of eight hours, and assert
+ # that the actual lifetime matches the user lifetime set in the policy,
+ # taking precedence over the lifetime enforced by Protected Users.
+ till = self.get_KerberosTime(offset=8 * 60 * 60) # 8 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_policy_tgt_lifetime_zero_protected(self):
+ # Create an authentication policy with the TGT lifetime set to zero.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=0)
+
+ # Create a user account with the assigned policy, belonging to the
+ # Protected Users group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of six hours, and assert
+ # that the actual lifetime is the four hours enforced by Protected
+ # Users.
+ till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=4 * 60 * 60,
+ expected_renew_life=4 * 60 * 60)
+
+ def test_authn_policy_tgt_lifetime_none_protected(self):
+ # Create an authentication policy with no TGT lifetime set.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True)
+
+ # Create a user account with the assigned policy, belonging to the
+ # Protected Users group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of six hours, and assert
+ # that the actual lifetime is the four hours enforced by Protected
+ # Users.
+ till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=4 * 60 * 60,
+ expected_renew_life=4 * 60 * 60)
+
+ def test_authn_policy_tgt_lifetime_unenforced_protected(self):
+ # Create an unenforced authentication policy with a TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy, belonging to the
+ # Protected Users group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of six hours, and assert
+ # that the actual lifetime is the four hours enforced by Protected
+ # Users.
+ till = self.get_KerberosTime(offset=6 * 60 * 60) # 6 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=4 * 60 * 60,
+ expected_renew_life=4 * 60 * 60)
+
+ def test_authn_policy_not_enforced(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is not enforced.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_policy_unenforced(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is set to be unenforced.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_not_enforced(self):
+ # Create an authentication policy with the TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policy. The silo is
+ # not enforced.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy))
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_unenforced(self):
+ # Create an authentication policy with the TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policy. The silo is
+ # set to be unenforced.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=False)
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_not_enforced_policy(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is not enforced.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours. Despite the
+ # fact that the policy is unenforced, the actual lifetime matches the
+ # user lifetime set in the appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_silo_unenforced_policy(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is set to be unenforced.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours. Despite the
+ # fact that the policy is unenforced, the actual lifetime matches the
+ # user lifetime set in the appropriate policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_silo_not_enforced_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ silo_lifetime = 123
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ enforced=True,
+ user_tgt_lifetime=silo_lifetime)
+
+ # Create an authentication silo with our existing policy. The silo is
+ # not enforced.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy))
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the silo, and also to the policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy. The directly-assigned
+ # policy is not enforced.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_unenforced_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ silo_lifetime = 123
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ enforced=True,
+ user_tgt_lifetime=silo_lifetime)
+
+ # Create an authentication silo with our existing policy. The silo is
+ # set to be unenforced.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy),
+ enforced=False)
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the silo, and also to the policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy. The directly-assigned
+ # policy is not enforced.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_not_enforced_policy_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is not enforced.
+ silo_lifetime = 123
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ user_tgt_lifetime=silo_lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy),
+ enforced=True)
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the silo, and also to the policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours. Despite the
+ # fact that the policy is unenforced, the actual lifetime matches the
+ # user lifetime set in the appropriate policy. The directly-assigned
+ # policy is not enforced.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=silo_lifetime,
+ expected_renew_life=silo_lifetime)
+
+ def test_authn_silo_unenforced_policy_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set. The policy
+ # is set to be unenforced.
+ silo_lifetime = 123
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ enforced=False,
+ user_tgt_lifetime=silo_lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy),
+ enforced=True)
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the silo, and also to the policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours. Despite the
+ # fact that the policy is unenforced, the actual lifetime matches the
+ # user lifetime set in the appropriate policy. The directly-assigned
+ # policy is not enforced.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=silo_lifetime,
+ expected_renew_life=silo_lifetime)
+
+ def test_authn_silo_not_a_member(self):
+ # Create an authentication policy with the TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo)
+
+ # Do not add the user to the silo as a member.
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_not_a_member_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ silo_lifetime = 123
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ enforced=True,
+ user_tgt_lifetime=silo_lifetime)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy),
+ enforced=True)
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the silo, and also to the policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+
+ # Do not add the user to the silo as a member.
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # directly-assigned policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_silo_not_assigned(self):
+ # Create an authentication policy with the TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=True)
+
+ # Create a user account, but don’t assign it to the silo.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future. Assert that the actual lifetime is the maximum allowed by
+ # the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_not_assigned_and_assigned_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ lifetime = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create an authentication silo with our existing policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(policy),
+ enforced=True)
+
+ # Create a second policy with a different lifetime, so we can verify
+ # the correct policy is enforced.
+ lifetime = 456
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=lifetime)
+
+ # Create a user account assigned to the policy, but not to the silo.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # directly-assigned policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=lifetime,
+ expected_renew_life=lifetime)
+
+ def test_authn_silo_no_applicable_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ user_life = 111
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life)
+
+ # Create an authentication silo containing no policies.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ enforced=True)
+
+ # Create a user account assigned to the silo, and also to a policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future, and assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_authn_silo_no_tgt_lifetime(self):
+ # Create an authentication policy with no TGT lifetime set.
+ silo_policy_id = self.get_new_username()
+ silo_policy = self.create_authn_policy(silo_policy_id,
+ enforced=True)
+
+ # Create a second policy with a lifetime set, so we can verify the
+ # correct policy is enforced.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=456)
+
+ # Create an authentication silo with our existing policy.
+ silo_id = self.get_new_username()
+ silo = self.create_authn_silo(silo_id,
+ user_policy=str(silo_policy),
+ enforced=True)
+
+ # Create a user account assigned to the silo, and also to a policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_silo=silo,
+ assigned_policy=policy)
+ client_dn_str = str(client_creds.get_dn())
+
+ # Add the user to the silo as a member.
+ self.add_to_group(client_dn_str, silo, 'msDS-AuthNPolicySiloMembers',
+ expect_attr=False)
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future, and assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_not_a_policy(self):
+ # Create a user account with the assigned policy set to something that
+ # isn’t a policy.
+ samdb = self.get_samdb()
+ client_creds = self._get_creds(
+ account_type=self.AccountType.USER,
+ assigned_policy=samdb.get_default_basedn())
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future, and assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_not_a_silo(self):
+ # Create a user account assigned to a silo that isn’t a silo.
+ samdb = self.get_samdb()
+ client_creds = self._get_creds(
+ account_type=self.AccountType.USER,
+ assigned_silo=samdb.get_default_basedn())
+
+ # Request a Kerberos ticket with a ‘till’ time far in the
+ # future, and assert that the actual lifetime is the maximum
+ # allowed by the Default Domain Policy.
+ till = '99991231235959Z'
+ expected_lifetime = self.get_max_ticket_life()
+ expected_renew_life = self.get_max_renew_life()
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=expected_lifetime,
+ expected_renew_life=expected_renew_life)
+
+ def test_not_a_silo_and_policy(self):
+ # Create an authentication policy with the TGT lifetime set.
+ user_life = 123
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_tgt_lifetime=user_life)
+
+ # Create a user account assigned to a silo that isn’t a silo, and also
+ # to a policy.
+ samdb = self.get_samdb()
+ client_creds = self._get_creds(
+ account_type=self.AccountType.USER,
+ assigned_silo=samdb.get_default_basedn(),
+ assigned_policy=policy)
+
+ # Request a Kerberos ticket with a lifetime of two hours, and assert
+ # that the actual lifetime matches the user lifetime set in the
+ # directly-assigned policy.
+ till = self.get_KerberosTime(offset=2 * 60 * 60) # 2 hours
+ tgt = self._get_tgt(client_creds, till=till)
+ self.check_ticket_times(tgt, expected_life=user_life,
+ expected_renew_life=user_life)
+
+ def test_authn_policy_allowed_from_empty(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy with no DACL in the security
+ # descriptor.
+ allowed_from = 'O:SY'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed_from)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed,
+ service_allowed_from=denied)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly denies the machine
+ # account for a user.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ denied = f'O:SYD:(D;;CR;;;{mach_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=denied,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we get a policy error when trying to authenticate.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_service_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a service.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=denied,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_service_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly denies the machine
+ # account for a service.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ denied = f'O:SYD:(D;;CR;;;{mach_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that we get a policy error when trying to authenticate.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_no_owner(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user. Omit the owner (O:SY) from the SDDL.
+ allowed = 'D:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we get a generic error if the security descriptor lacks an
+ # owner.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_GENERIC)
+
+ def test_authn_policy_allowed_from_no_owner_unenforced(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an unenforced authentication policy that explicitly allows the
+ # machine account for a user. Omit the owner (O:SY) from the SDDL.
+ allowed = 'D:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we don’t get an error if the policy is unenforced.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_owner_self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user. Set the owner to the machine account.
+ allowed = f'O:{mach_sid}D:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_owner_anon(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user. Set the owner to be anonymous.
+ allowed = 'O:AND:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_no_fast(self):
+ # Create an authentication policy that restricts authentication.
+ allowed_from = 'O:SY'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed_from)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we cannot authenticate without using an armor ticket.
+ self._get_tgt(client_creds, expected_error=KDC_ERR_POLICY,
+ expect_status=True,
+ expected_status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
+
+ def test_authn_policy_allowed_from_user_allow_group_not_a_member(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST and which does
+ # not belong to the group.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we get a policy error, as the machine account does not
+ # belong to the group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_group_member(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST that belongs to
+ # the group.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'member_of': (group_dn,)})
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket, since the
+ # machine account belongs to the group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_domain_local_group(self):
+ samdb = self.get_samdb()
+
+ # Create a new domain-local group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name,
+ gtype=GroupType.DOMAIN_LOCAL.value)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST that belongs to
+ # the group.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'member_of': (group_dn,)})
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that the groups in the armor ticket are expanded to include the
+ # domain-local group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_asserted_identity(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Authentication Authority Asserted Identity SID.
+ allowed = (
+ f'O:SYD:(A;;CR;;;'
+ f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})'
+ )
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_claims_valid(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Claims Valid SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_compounded_auth(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Compounded Authentication SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is denied.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_authenticated_users(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Authenticated Users SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_ntlm_authn(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create an authentication policy that allows accounts with the NTLM
+ # Authentication SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is denied.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed,
+ service_allowed_from=denied)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_deny_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly denies the machine
+ # account for a user.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ denied = f'O:SYD:(D;;CR;;;{mach_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=denied,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we get a policy error when trying to authenticate.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_service_allow_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a service.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=denied,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_service_deny_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create an authentication policy that explicitly denies the machine
+ # account for a service.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ denied = f'O:SYD:(D;;CR;;;{mach_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that we get a policy error when trying to authenticate.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_group_not_a_member_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST and which does
+ # not belong to the group.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we get a policy error, as the machine account does not
+ # belong to the group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_group_member_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST that belongs to
+ # the group.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'member_of': (group_dn,),
+ 'allowed_replication_mock': True,
+ 'revealed_to_mock_rodc': True})
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that we can authenticate using an armor ticket, since the
+ # machine account belongs to the group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_domain_local_group_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a new domain-local group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name,
+ gtype=GroupType.DOMAIN_LOCAL.value)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a machine account with which to perform FAST that belongs to
+ # the group.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'member_of': (group_dn,),
+ 'allowed_replication_mock': True,
+ 'revealed_to_mock_rodc': True})
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that the groups in the armor ticket are expanded to include the
+ # domain-local group.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_asserted_identity_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Authentication Authority Asserted Identity SID.
+ allowed = (
+ f'O:SYD:(A;;CR;;;'
+ f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})'
+ )
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_claims_valid_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Claims Valid SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_compounded_authn_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Compounded Authentication SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is denied.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_allow_authenticated_users_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Authenticated Users SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_from_user_allow_ntlm_authn_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ mach_tgt = self.issued_by_rodc(self.get_tgt(mach_creds))
+
+ # Create an authentication policy that allows accounts with the NTLM
+ # Authentication SID.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy)
+
+ # Show that authentication is denied.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt,
+ expected_error=KDC_ERR_POLICY)
+
+ def test_authn_policy_allowed_from_user_deny_user(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(account_type=self.AccountType.USER,
+ use_cache=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ # Create an authentication policy that explicitly allows the machine
+ # account for a user, while denying the user account itself.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})(D;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed,
+ service_allowed_from=denied)
+
+ # Assign the policy to the user account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that authentication is allowed.
+ self._get_tgt(client_creds, armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_empty(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy with no DACL in the security
+ # descriptor.
+ allowed_to = 'O:SY'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed_to)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that authentication is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_computer_allow_but_deny_mach(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket, while
+ # explicitly denying the machine account.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Despite the documentation’s claims that the machine account is also
+ # access-checked, obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_mach(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+ mach_sid = self.get_objectSid(samdb, mach_creds.get_dn())
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the machine account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{mach_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_no_fast(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed without an armor TGT.
+ self._tgs_req(tgt, 0, client_creds, target_creds)
+
+ def test_authn_policy_denied_no_fast(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly disallows the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is not allowed.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ expect_edata=self.expect_padata_outer,
+ expect_status=True,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_computer_allow_asserted_identity(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Authentication Authority Asserted Identity SID to obtain a service
+ # ticket.
+ allowed = (
+ f'O:SYD:(A;;CR;;;'
+ f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})'
+ )
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_claims_valid(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts with the Claims
+ # Valid SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_compounded_auth(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Compounded Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_computer_allow_authenticated_users(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts with the
+ # Authenticated Users SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_ntlm_authn(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts with the NTLM
+ # Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_no_owner(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket. Omit
+ # the owner (O:SY) from the SDDL.
+ allowed = f'D:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_INVALID_PARAMETER,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_no_owner_unenforced(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an unenforced authentication policy that applies to a computer
+ # and explicitly allows the user account to obtain a service
+ # ticket. Omit the owner (O:SY) from the SDDL.
+ allowed = f'D:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_owner_self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket. Set
+ # the owner to the user account.
+ allowed = f'O:{client_sid}D:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_owner_anon(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket. Set
+ # the owner to be anonymous.
+ allowed = f'O:AND:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a user and explicitly
+ # allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=denied)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a user and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_service_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_service_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a user and explicitly
+ # allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=denied)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_deny_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a user and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_computer_allow_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_deny_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,)
+
+ def test_authn_policy_allowed_to_service_allow_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_service_deny_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_user_allow_group_not_a_member(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account which does not belong to the group.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that we get a policy error, as the user account does not belong
+ # to the group.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_group_member(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={'member_of': (group_dn,)})
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that we can get a service ticket, since the user account belongs
+ # to the group.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_allow_domain_local_group(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new domain-local group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name,
+ gtype=GroupType.DOMAIN_LOCAL.value)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={'member_of': (group_dn,)})
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that the groups in the TGT are expanded to include the
+ # domain-local group.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_asserted_identity_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Authentication Authority Asserted Identity SID to obtain a service
+ # ticket.
+ allowed = (
+ f'O:SYD:(A;;CR;;;'
+ f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})'
+ )
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_claims_valid_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts with the Claims
+ # Valid SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is not allowed.
+ self._tgs_req(tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_compounded_authn_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Compounded Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_computer_allow_authenticated_users_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts with the
+ # Authenticated Users SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_ntlm_authn_from_rodc(self):
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts with the NTLM
+ # Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that obtaining a service ticket is denied.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_user_allow_group_not_a_member_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account which does not belong to the group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ allowed_rodc=True)
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that we get a policy error, as the user account does not belong
+ # to the group.
+ self._tgs_req(
+ tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_authn_policy_allowed_to_user_allow_group_member_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={'member_of': (group_dn,),
+ 'allowed_replication_mock': True,
+ 'revealed_to_mock_rodc': True})
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that we can get a service ticket, since the user account belongs
+ # to the group.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_allow_domain_local_group_from_rodc(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a new domain-local group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name,
+ gtype=GroupType.DOMAIN_LOCAL.value)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ opts={'member_of': (group_dn,),
+ 'allowed_replication_mock': True,
+ 'revealed_to_mock_rodc': True})
+ # Modify the TGT to be issued by an RODC.
+ tgt = self.issued_by_rodc(self.get_tgt(client_creds))
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed)
+
+ # Create a user account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ spn='host/{account}')
+
+ # Show that the groups in the TGT are expanded to include the
+ # domain-local group.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a computer account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that obtaining a service ticket to ourselves is allowed.
+ self._tgs_req(tgt, 0, client_creds, client_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_deny_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a computer account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that obtaining a service ticket to ourselves is allowed, despite
+ # the policy disallowing it.
+ self._tgs_req(tgt, 0, client_creds, client_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_allow_to_self_with_self(self):
+ samdb = self.get_samdb()
+
+ # Create a computer account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ use_cache=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that obtaining a service ticket to ourselves armored with our
+ # own TGT is allowed.
+ self._tgs_req(tgt, 0, client_creds, client_creds,
+ armor_tgt=tgt)
+
+ def test_authn_policy_allowed_to_computer_deny_to_self_with_self(self):
+ samdb = self.get_samdb()
+
+ # Create a computer account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ use_cache=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that obtaining a service ticket to ourselves armored with our
+ # own TGT is allowed, despite the policy’s disallowing it.
+ self._tgs_req(tgt, 0, client_creds, client_creds,
+ armor_tgt=tgt)
+
+ def test_authn_policy_allowed_to_user_allow_s4u2self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[client_creds.get_username()])
+ client_realm = client_creds.get_realm()
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+ target_tgt = self.get_tgt(target_creds)
+
+ def generate_s4u2self_padata(_kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ padata = self.PA_S4U2Self_create(
+ name=client_cname,
+ realm=client_realm,
+ tgt_session_key=target_tgt.session_key,
+ ctype=None)
+
+ return [padata], req_body
+
+ # Show that obtaining a service ticket with S4U2Self is allowed.
+ self._tgs_req(target_tgt, 0, target_creds, target_creds,
+ expected_cname=client_cname,
+ generate_fast_padata_fn=generate_s4u2self_padata,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_deny_s4u2self(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_cname = self.PrincipalName_create(
+ name_type=NT_PRINCIPAL,
+ names=[client_creds.get_username()])
+ client_realm = client_creds.get_realm()
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+ target_tgt = self.get_tgt(target_creds)
+
+ def generate_s4u2self_padata(_kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ padata = self.PA_S4U2Self_create(
+ name=client_cname,
+ realm=client_realm,
+ tgt_session_key=target_tgt.session_key,
+ ctype=None)
+
+ return [padata], req_body
+
+ # Show that obtaining a service ticket with S4U2Self is allowed,
+ # despite the policy.
+ self._tgs_req(target_tgt, 0, target_creds, target_creds,
+ expected_cname=client_cname,
+ generate_fast_padata_fn=generate_s4u2self_padata,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_user_allow_constrained_delegation(self):
+ samdb = self.get_samdb()
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a target account.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ target_spn = target_creds.get_spn()
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'delegation_to_spn': target_spn,
+ })
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the service account to obtain a service ticket,
+ # while denying the user.
+ allowed = f'O:SYD:(A;;CR;;;{service_sid})(D;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the target account.
+ self.add_attribute(samdb, str(target_creds.get_dn()),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+ target_etypes = target_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket with constrained delegation is
+ # allowed.
+ self._tgs_req(service_tgt, 0, service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=target_etypes,
+ expected_proxy_target=target_spn,
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_deny_constrained_delegation(self):
+ samdb = self.get_samdb()
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a target account.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ target_spn = target_creds.get_spn()
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'delegation_to_spn': target_spn,
+ })
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the service account to obtain a service ticket,
+ # while allowing the user.
+ denied = f'O:SYD:(D;;CR;;;{service_sid})(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=denied)
+
+ # Assign the policy to the target account.
+ self.add_attribute(samdb, str(target_creds.get_dn()),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+
+ # Show that obtaining a service ticket with constrained delegation is
+ # not allowed.
+ self._tgs_req(
+ service_tgt, KDC_ERR_POLICY, service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_constrained_delegation_wrong_sname(self):
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ use_cache=False)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a target account.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1})
+ target_spn = target_creds.get_spn()
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'delegation_to_spn': target_spn})
+ service_tgt = self.get_tgt(service_creds)
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags,
+ fresh=True)
+ # Change the ‘sname’ of the ticket to an incorrect value.
+ client_service_tkt.set_sname(self.get_krbtgt_sname())
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+
+ # Show that obtaining a service ticket with constrained delegation
+ # fails if the sname doesn’t match.
+ self._tgs_req(service_tgt, KDC_ERR_BADOPTION,
+ service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_rbcd(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1})
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the service account to obtain a service ticket,
+ # while denying the user.
+ allowed = f'O:SYD:(A;;CR;;;{service_sid})(D;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a target account with the assigned policy.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'assigned_policy': str(policy),
+ 'delegation_from_dn': str(service_creds.get_dn()),
+ })
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+ target_etypes = target_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket with RBCD is allowed.
+ self._tgs_req(service_tgt, 0, service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=target_etypes,
+ expected_proxy_target=target_creds.get_spn(),
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_deny_rbcd(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1})
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the service account to obtain a service ticket,
+ # while allowing the user.
+ denied = f'O:SYD:(D;;CR;;;{service_sid})(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=denied)
+
+ # Create a target account with the assigned policy.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'assigned_policy': str(policy),
+ 'delegation_from_dn': str(service_creds.get_dn()),
+ })
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+
+ # Show that obtaining a service ticket with RBCD is not allowed.
+ self._tgs_req(service_tgt, KDC_ERR_POLICY, service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_rbcd_wrong_sname(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER,
+ use_cache=False)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1})
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create a target account with the assigned policy.
+ target_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'delegation_from_dn': str(service_creds.get_dn()),
+ })
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags,
+ fresh=True)
+ # Change the ‘sname’ of the ticket to an incorrect value.
+ client_service_tkt.set_sname(self.get_krbtgt_sname())
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+
+ # Show that obtaining a service ticket with RBCD fails if the sname
+ # doesn’t match.
+ self._tgs_req(service_tgt, KDC_ERR_BADOPTION,
+ service_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_constrained_delegation_to_self(self):
+ samdb = self.get_samdb()
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account.
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_spn = service_creds.get_spn()
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Allow delegation to ourselves.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AllowedToDelegateTo', service_spn)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the client account to obtain a service ticket,
+ # while denying the service.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+ target_etypes = service_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket to ourselves with constrained
+ # delegation is allowed.
+ self._tgs_req(service_tgt, 0, service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=target_etypes,
+ expected_proxy_target=service_spn,
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_deny_constrained_delegation_to_self(self):
+ samdb = self.get_samdb()
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account.
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_spn = service_creds.get_spn()
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Allow delegation to ourselves.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AllowedToDelegateTo', service_spn)
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the client account to obtain a service ticket,
+ # while allowing the service.
+ allowed = f'O:SYD:(D;;CR;;;{client_sid})(A;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+ target_etypes = service_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket to ourselves with constrained
+ # delegation is allowed, despite the policy’s disallowing it.
+ self._tgs_req(service_tgt, 0, service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=target_etypes,
+ expected_proxy_target=service_spn,
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_not_allowed_constrained_delegation_to_self(self):
+ samdb = self.get_samdb()
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account.
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Don’t set msDS-AllowedToDelegateTo.
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the client account to obtain a service ticket,
+ # while denying the service.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ target_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+
+ # Show that obtaining a service ticket to ourselves with constrained
+ # delegation is not allowed without msDS-AllowedToDelegateTo.
+ self._tgs_req(service_tgt, KDC_ERR_BADOPTION,
+ service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ additional_ticket=client_service_tkt,
+ decryption_key=target_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_allow_rbcd_to_self(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account allowed to delegate to itself. We can’t use
+ # a more specific ACE containing the account’s SID (obtained
+ # post-creation) as Samba (unlike Windows) won’t let us modify
+ # msDS-AllowedToActOnBehalfOfOtherIdentity without being System.
+ domain_sid = security.dom_sid(samdb.get_domain_sid())
+ security_descriptor = security.descriptor.from_sddl(
+ 'O:BAD:(A;;CR;;;WD)', domain_sid)
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'delegation_from_dn': ndr_pack(security_descriptor)},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the client account to obtain a service ticket,
+ # while denying the service.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ service_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+ service_etypes = service_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket to ourselves with RBCD is
+ # allowed.
+ self._tgs_req(service_tgt, 0, service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=service_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=service_etypes,
+ expected_proxy_target=service_creds.get_spn(),
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_deny_rbcd_to_self(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_username = client_creds.get_username()
+ client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[client_username])
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account allowed to delegate to itself. We can’t use
+ # a more specific ACE containing the account’s SID (obtained
+ # post-creation) as Samba (unlike Windows) won’t let us modify
+ # msDS-AllowedToActOnBehalfOfOtherIdentity without being System.
+ domain_sid = security.dom_sid(samdb.get_domain_sid())
+ security_descriptor = security.descriptor.from_sddl(
+ 'O:BAD:(A;;CR;;;WD)', domain_sid)
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'delegation_from_dn': ndr_pack(security_descriptor)},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the client account to obtain a service ticket,
+ # while allowing the service.
+ allowed = f'O:SYD:(D;;CR;;;{client_sid})(A;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ service_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+ service_etypes = service_creds.tgs_supported_enctypes
+
+ service_name = service_creds.get_username()
+ if service_name[-1] == '$':
+ service_name = service_name[:-1]
+ expected_transited_services = [
+ f'host/{service_name}@{service_creds.get_realm()}'
+ ]
+
+ # Show that obtaining a service ticket to ourselves with RBCD is
+ # allowed, despite the policy’s disallowing it.
+ self._tgs_req(service_tgt, 0, service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ expected_cname=client_cname,
+ expected_account_name=client_username,
+ additional_ticket=client_service_tkt,
+ decryption_key=service_decryption_key,
+ expected_sid=client_sid,
+ expected_supported_etypes=service_etypes,
+ expected_proxy_target=service_creds.get_spn(),
+ expected_transited_services=expected_transited_services)
+
+ def test_authn_policy_allowed_to_user_not_allowed_rbcd_to_self(self):
+ samdb = self.get_samdb()
+ functional_level = self.get_domain_functional_level(samdb)
+
+ if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
+ self.skipTest('RBCD requires FL2008')
+
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ client_tkt_options = 'forwardable'
+ expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
+
+ client_tgt = self.get_tgt(client_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a service account.
+ service_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={'id': 1},
+ use_cache=False)
+ service_dn_str = str(service_creds.get_dn())
+ service_sid = self.get_objectSid(samdb, service_creds.get_dn())
+ service_tgt = self.get_tgt(service_creds)
+
+ # Don’t set msDS-AllowedToActOnBehalfOfOtherIdentity.
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the client account to obtain a service ticket,
+ # while denying the service.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})(D;;CR;;;{service_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Assign the policy to the service account.
+ self.add_attribute(samdb, service_dn_str,
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ client_service_tkt = self.get_service_ticket(
+ client_tgt,
+ service_creds,
+ kdc_options=client_tkt_options,
+ expected_flags=expected_flags)
+
+ kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
+
+ service_decryption_key = self.TicketDecryptionKey_from_creds(
+ service_creds)
+
+ # Show that obtaining a service ticket to ourselves with RBCD
+ # is not allowed without msDS-AllowedToActOnBehalfOfOtherIdentity.
+ self._tgs_req(service_tgt, KDC_ERR_BADOPTION,
+ service_creds, service_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ pac_options='1001', # supports claims, RBCD
+ additional_ticket=client_service_tkt,
+ decryption_key=service_decryption_key,
+ expect_edata=self.expect_padata_outer,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_computer_allow_user2user(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ client_creds = self.get_mach_creds()
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ client_tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+ target_tgt = self._get_tgt(target_creds)
+
+ kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
+
+ # Show that obtaining a service ticket with user-to-user is allowed.
+ self._tgs_req(client_tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ additional_ticket=target_tgt)
+
+ def test_authn_policy_allowed_to_computer_deny_user2user(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ client_creds = self.get_mach_creds()
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ client_tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+ target_tgt = self._get_tgt(target_creds)
+
+ kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
+
+ # Show that obtaining a service ticket with user-to-user is not
+ # allowed.
+ self._tgs_req(
+ client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
+ armor_tgt=mach_tgt,
+ kdc_options=kdc_options,
+ additional_ticket=target_tgt,
+ expect_edata=self.expect_padata_outer,
+ # We aren’t particular about whether or not we get an NTSTATUS.
+ expect_status=None,
+ expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
+ check_patypes=False)
+
+ def test_authn_policy_allowed_to_user_derived_class_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a user and explicitly
+ # allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=denied)
+
+ # Create a schema class derived from ‘user’.
+ class_id = random.randint(0, 100000000)
+ user_class_cn = f'my-User-Class-{class_id}'
+ user_class = user_class_cn.replace('-', '')
+ class_dn = samdb.get_schema_basedn()
+ class_dn.add_child(f'CN={user_class_cn}')
+ governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}'
+
+ samdb.add({
+ 'dn': class_dn,
+ 'objectClass': 'classSchema',
+ 'subClassOf': 'user',
+ 'governsId': governs_id,
+ 'lDAPDisplayName': user_class,
+ })
+
+ # Create an account derived from ‘user’ with the assigned policy.
+ target_name = self.get_new_username()
+ target_creds, target_dn = self.create_account(
+ samdb, target_name,
+ account_type=self.AccountType.USER,
+ spn='host/{account}',
+ additional_details={
+ 'msDS-AssignedAuthNPolicy': str(policy),
+ 'objectClass': user_class,
+ })
+
+ keys = self.get_keys(target_creds)
+ self.creds_set_keys(target_creds, keys)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_computer_derived_class_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a schema class derived from ‘computer’.
+ class_id = random.randint(0, 100000000)
+ computer_class_cn = f'my-Computer-Class-{class_id}'
+ computer_class = computer_class_cn.replace('-', '')
+ class_dn = samdb.get_schema_basedn()
+ class_dn.add_child(f'CN={computer_class_cn}')
+ governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}'
+
+ samdb.add({
+ 'dn': class_dn,
+ 'objectClass': 'classSchema',
+ 'subClassOf': 'computer',
+ 'governsId': governs_id,
+ 'lDAPDisplayName': computer_class,
+ })
+
+ # Create an account derived from ‘computer’ with the assigned policy.
+ target_name = self.get_new_username()
+ target_creds, target_dn = self.create_account(
+ samdb, target_name,
+ account_type=self.AccountType.COMPUTER,
+ spn=f'host/{target_name}',
+ additional_details={
+ 'msDS-AssignedAuthNPolicy': str(policy),
+ 'objectClass': computer_class,
+ })
+
+ keys = self.get_keys(target_creds)
+ self.creds_set_keys(target_creds, keys)
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_allowed_to_service_derived_class_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a machine account with which to perform FAST.
+ mach_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER)
+ mach_tgt = self.get_tgt(mach_creds)
+
+ # Create a user account.
+ client_creds = self.get_cached_creds(
+ account_type=self.AccountType.USER)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+ tgt = self.get_tgt(client_creds)
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a schema class derived from ‘msDS-ManagedServiceAccount’.
+ class_id = random.randint(0, 100000000)
+ service_class_cn = f'my-Managed-Service-Class-{class_id}'
+ service_class = service_class_cn.replace('-', '')
+ class_dn = samdb.get_schema_basedn()
+ class_dn.add_child(f'CN={service_class_cn}')
+ governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}'
+
+ samdb.add({
+ 'dn': class_dn,
+ 'objectClass': 'classSchema',
+ 'subClassOf': 'msDS-ManagedServiceAccount',
+ 'governsId': governs_id,
+ 'lDAPDisplayName': service_class,
+ })
+
+ # Create an account derived from ‘msDS-ManagedServiceAccount’ with the
+ # assigned policy.
+ target_name = self.get_new_username()
+ target_creds, target_dn = self.create_account(
+ samdb, target_name,
+ account_type=self.AccountType.MANAGED_SERVICE,
+ spn=f'host/{target_name}',
+ additional_details={
+ 'msDS-AssignedAuthNPolicy': str(policy),
+ 'objectClass': service_class,
+ })
+
+ # Show that obtaining a service ticket is allowed.
+ self._tgs_req(tgt, 0, client_creds, target_creds,
+ armor_tgt=mach_tgt)
+
+ def test_authn_policy_ntlm_allow_user(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that NTLM authentication succeeds.
+ self._connect(client_creds, simple_bind=False)
+
+ def test_authn_policy_ntlm_deny_user(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that NTLM authentication fails.
+ self._connect(client_creds, simple_bind=False,
+ expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}')
+
+ def test_authn_policy_ntlm_computer(self):
+ # Create an authentication policy denying NTLM authentication.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=denied,
+ service_allowed_ntlm=False,
+ service_allowed_from=denied)
+
+ # Create a computer account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that NTLM authentication succeeds.
+ self._connect(client_creds, simple_bind=False)
+
+ def test_authn_policy_ntlm_allow_service(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that NTLM authentication succeeds.
+ self._connect(client_creds, simple_bind=False)
+
+ def test_authn_policy_ntlm_deny_service(self):
+ # Create an authentication policy denying NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that NTLM authentication fails.
+ self._connect(client_creds, simple_bind=False,
+ expect_error=f'{HRES_SEC_E_LOGON_DENIED:08X}')
+
+ def test_authn_policy_ntlm_deny_no_device_restrictions(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ service_allowed_ntlm=True)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that without AllowedToAuthenticateFrom set in the policy, NTLM
+ # authentication succeeds.
+ self._connect(client_creds, simple_bind=False)
+
+ def test_authn_policy_simple_bind_allow_user(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a simple bind succeeds.
+ self._connect(client_creds, simple_bind=True)
+
+ def test_authn_policy_simple_bind_deny_user(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a simple bind fails.
+ self._connect(client_creds, simple_bind=True,
+ expect_error=f'{HRES_SEC_E_INVALID_TOKEN:08X}')
+
+ def test_authn_policy_simple_bind_deny_no_device_restrictions(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ service_allowed_ntlm=True)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that without AllowedToAuthenticateFrom set in the policy, a
+ # simple bind succeeds.
+ self._connect(client_creds, simple_bind=True)
+
+ def test_authn_policy_samr_pwd_change_allow_service_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # managed service accounts.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a SAMR password change is allowed.
+ self._test_samr_change_password(client_creds, expect_error=None)
+
+ def test_authn_policy_samr_pwd_change_allow_service_not_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # managed service accounts.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a SAMR password change is allowed.
+ self._test_samr_change_password(client_creds, expect_error=None)
+
+ def test_authn_policy_samr_pwd_change_allow_service_no_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # managed service accounts.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a SAMR password change is allowed.
+ self._test_samr_change_password(client_creds, expect_error=None)
+
+ def test_authn_policy_samr_pwd_change_deny_service_allowed_from(self):
+ # Create an authentication policy denying NTLM authentication for
+ # managed service accounts.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that the SAMR connection fails.
+ self._test_samr_change_password(
+ client_creds, expect_error=None,
+ connect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samr_pwd_change_deny_service_not_allowed_from(self):
+ # Create an authentication policy denying NTLM authentication for
+ # managed service accounts.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that the SAMR connection fails.
+ self._test_samr_change_password(
+ client_creds, expect_error=None,
+ connect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samr_pwd_change_deny_service_no_allowed_from(self):
+ # Create an authentication policy denying NTLM authentication for
+ # managed service accounts.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a SAMR password change is allowed.
+ self._test_samr_change_password(client_creds, expect_error=None)
+
+ def test_authn_policy_samlogon_allow_user(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t
+ # state it, AllowedNTLMNetworkAuthentication applies to interactive
+ # logons too.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_deny_user(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=allowed,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_computer(self):
+ # Create an authentication policy denying NTLM authentication.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=denied,
+ service_allowed_ntlm=False,
+ service_allowed_from=denied)
+
+ # Create a computer account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_interactive_allow_user_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t
+ # state it, AllowedNTLMNetworkAuthentication applies to interactive
+ # logons too.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_interactive_allow_user_not_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True,
+ user_allowed_from=denied)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon succeeds. Although MS-APDS doesn’t
+ # state it, AllowedNTLMNetworkAuthentication applies to interactive
+ # logons too.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_interactive_allow_user_no_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=True)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_interactive_deny_user_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # users.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_interactive_deny_user_not_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # users.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ user_allowed_from=denied)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_interactive_deny_user_no_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_interactive_user_allowed_from(self):
+ # Create an authentication policy not specifying whether NTLM
+ # authentication is allowed or not.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_user_allowed_from(self):
+ # Create an authentication policy not specifying whether NTLM
+ # authentication is allowed or not.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_from=allowed)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_allow_service_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_allow_service_not_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_allow_service_no_allowed_from(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_deny_service_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_deny_service_not_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_deny_service_no_allowed_from(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_allow_service_allowed_from_to_self(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_allow_service_not_allowed_from_to_self(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_allow_service_no_allowed_from_to_self(self):
+ # Create an authentication policy allowing NTLM authentication for
+ # services.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=True)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_network_deny_service_allowed_from_to_self(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=allowed)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_deny_service_not_allowed_from_to_self(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False,
+ service_allowed_from=denied)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_authn_policy_samlogon_network_deny_service_no_allowed_from_to_self(self):
+ # Create an authentication policy disallowing NTLM authentication for
+ # services.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ service_allowed_ntlm=False)
+
+ # Create a managed service account with the assigned policy.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy,
+ ntlm=True,
+ cached=False)
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_authn_policy_samlogon_interactive_deny_no_device_restrictions(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ service_allowed_ntlm=True)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that without AllowedToAuthenticateFrom set in the policy, an
+ # interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_authn_policy_samlogon_network_deny_no_device_restrictions(self):
+ # Create an authentication policy denying NTLM authentication for
+ # users.
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_ntlm=False,
+ service_allowed_ntlm=True)
+
+ # Create a user account with the assigned policy.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ assigned_policy=policy,
+ ntlm=True)
+
+ # Show that without AllowedToAuthenticateFrom set in the policy, a
+ # network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_samlogon_allowed_to_computer_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_computer_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_deny_protected(self):
+ samdb = self.get_samdb()
+
+ # Create a protected user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ protected=True,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_ACCOUNT_RESTRICTION)
+
+ def test_samlogon_allowed_to_computer_allow_asserted_identity(self):
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts with the
+ # Authentication Authority Asserted Identity SID to obtain a service
+ # ticket.
+ allowed = (
+ f'O:SYD:(A;;CR;;;'
+ f'{security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY})'
+ )
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_allow_claims_valid(self):
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts with the Claims
+ # Valid SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_CLAIMS_VALID})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_allow_compounded_auth(self):
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts with the
+ # Compounded Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_COMPOUNDED_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_allow_authenticated_users(self):
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts with the
+ # Authenticated Users SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_AUTHENTICATED_USERS})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_computer_allow_ntlm_authn(self):
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts with the NTLM
+ # Authentication SID to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{security.SID_NT_NTLM_AUTHENTICATION})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_no_owner(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket. Omit
+ # the owner (O:SY) from the SDDL.
+ allowed = f'D:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_INVALID_PARAMETER)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_INVALID_PARAMETER)
+
+ def test_samlogon_allowed_to_no_owner_unenforced(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an unenforced authentication policy that applies to a computer
+ # and explicitly allows the user account to obtain a service
+ # ticket. Omit the owner (O:SY) from the SDDL.
+ allowed = f'D:(A;;CR;;;{client_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=False,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_service_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_service_deny(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a managed service account with the assigned policy.
+ target_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_allow_group_not_a_member(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account which does not belong to the group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon fails, as the user account does not
+ # belong to the group.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ # Show that an interactive SamLogon fails, as the user account does not
+ # belong to the group.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_allow_group_member(self):
+ samdb = self.get_samdb()
+
+ # Create a new group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ member_of=group_dn,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds, since the user account belongs
+ # to the group.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds, since the user account
+ # belongs to the group.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_computer_allow_domain_local_group(self):
+ samdb = self.get_samdb()
+
+ # Create a new domain-local group.
+ group_name = self.get_new_username()
+ group_dn = self.create_group(samdb, group_name,
+ gtype=GroupType.DOMAIN_LOCAL.value)
+ group_sid = self.get_objectSid(samdb, group_dn)
+
+ # Create a user account that belongs to the group.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ member_of=group_dn,
+ ntlm=True)
+
+ # Create an authentication policy that allows accounts belonging to the
+ # group.
+ allowed = f'O:SYD:(A;;CR;;;{group_sid})'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ computer_allowed_to=allowed)
+
+ # Create a computer account with the assigned policy.
+ target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ assigned_policy=policy)
+
+ # Show that a network SamLogon succeeds, since the user account belongs
+ # to the group.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds, since the user account
+ # belongs to the group.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_computer_allow_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a computer account.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ ntlm=True,
+ cached=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_samlogon_allowed_to_computer_deny_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a computer account.
+ client_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
+ ntlm=True,
+ cached=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that a network SamLogon to ourselves fails, despite
+ # authentication being allowed in the Kerberos case.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_service_allow_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a managed service account.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ ntlm=True,
+ cached=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that a network SamLogon to ourselves succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ def test_samlogon_allowed_to_service_deny_to_self(self):
+ samdb = self.get_samdb()
+
+ # Create a managed service account.
+ client_creds = self._get_creds(
+ account_type=self.AccountType.MANAGED_SERVICE,
+ ntlm=True,
+ cached=False)
+ client_dn = client_creds.get_dn()
+ client_sid = self.get_objectSid(samdb, client_dn)
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly denies the user account to obtain a service ticket.
+ denied = f'O:SYD:(D;;CR;;;{client_sid})'
+ allowed = 'O:SYD:(A;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=allowed,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Assign the policy to the account.
+ self.add_attribute(samdb, str(client_dn),
+ 'msDS-AssignedAuthNPolicy', str(policy))
+
+ # Show that a network SamLogon to ourselves fails, despite
+ # authentication being allowed in the Kerberos case.
+ self._test_samlogon(
+ creds=client_creds,
+ domain_joined_mach_creds=client_creds,
+ logon_type=netlogon.NetlogonNetworkInformation,
+ expect_error=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
+
+ def test_samlogon_allowed_to_computer_derived_class_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a computer and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=allowed,
+ service_allowed_to=denied)
+
+ # Create a schema class derived from ‘computer’.
+ class_id = random.randint(0, 100000000)
+ computer_class_cn = f'my-Computer-Class-{class_id}'
+ computer_class = computer_class_cn.replace('-', '')
+ class_dn = samdb.get_schema_basedn()
+ class_dn.add_child(f'CN={computer_class_cn}')
+ governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}'
+
+ samdb.add({
+ 'dn': class_dn,
+ 'objectClass': 'classSchema',
+ 'subClassOf': 'computer',
+ 'governsId': governs_id,
+ 'lDAPDisplayName': computer_class,
+ })
+
+ # Create an account derived from ‘computer’ with the assigned policy.
+ target_name = self.get_new_username()
+ target_creds, target_dn = self.create_account(
+ samdb, target_name,
+ account_type=self.AccountType.COMPUTER,
+ spn=f'host/{target_name}',
+ additional_details={
+ 'msDS-AssignedAuthNPolicy': str(policy),
+ 'objectClass': computer_class,
+ })
+
+ keys = self.get_keys(target_creds)
+ self.creds_set_keys(target_creds, keys)
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def test_samlogon_allowed_to_service_derived_class_allow(self):
+ samdb = self.get_samdb()
+
+ # Create a user account.
+ client_creds = self._get_creds(account_type=self.AccountType.USER,
+ ntlm=True)
+ client_sid = self.get_objectSid(samdb, client_creds.get_dn())
+
+ # Create an authentication policy that applies to a managed service and
+ # explicitly allows the user account to obtain a service ticket.
+ allowed = f'O:SYD:(A;;CR;;;{client_sid})'
+ denied = 'O:SYD:(D;;CR;;;WD)'
+ policy_id = self.get_new_username()
+ policy = self.create_authn_policy(policy_id,
+ enforced=True,
+ user_allowed_to=denied,
+ computer_allowed_to=denied,
+ service_allowed_to=allowed)
+
+ # Create a schema class derived from ‘msDS-ManagedServiceAccount’.
+ class_id = random.randint(0, 100000000)
+ service_class_cn = f'my-Managed-Service-Class-{class_id}'
+ service_class = service_class_cn.replace('-', '')
+ class_dn = samdb.get_schema_basedn()
+ class_dn.add_child(f'CN={service_class_cn}')
+ governs_id = f'1.3.6.1.4.1.7165.4.6.2.9.{class_id}'
+
+ samdb.add({
+ 'dn': class_dn,
+ 'objectClass': 'classSchema',
+ 'subClassOf': 'msDS-ManagedServiceAccount',
+ 'governsId': governs_id,
+ 'lDAPDisplayName': service_class,
+ })
+
+ # Create an account derived from ‘msDS-ManagedServiceAccount’ with the
+ # assigned policy.
+ target_name = self.get_new_username()
+ target_creds, target_dn = self.create_account(
+ samdb, target_name,
+ account_type=self.AccountType.MANAGED_SERVICE,
+ spn=f'host/{target_name}',
+ additional_details={
+ 'msDS-AssignedAuthNPolicy': str(policy),
+ 'objectClass': service_class,
+ })
+
+ # Show that a network SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonNetworkInformation)
+
+ # Show that an interactive SamLogon succeeds.
+ self._test_samlogon(creds=client_creds,
+ domain_joined_mach_creds=target_creds,
+ logon_type=netlogon.NetlogonInteractiveInformation)
+
+ def check_ticket_times(self,
+ ticket_creds,
+ expected_life=None,
+ expected_renew_life=None):
+ ticket = ticket_creds.ticket_private
+
+ authtime = ticket['authtime']
+ starttime = ticket.get('starttime', authtime)
+ endtime = ticket['endtime']
+ renew_till = ticket.get('renew-till', None)
+
+ starttime = self.get_EpochFromKerberosTime(starttime)
+
+ if expected_life is not None:
+ actual_end = self.get_EpochFromKerberosTime(
+ endtime.decode('ascii'))
+ actual_lifetime = actual_end - starttime
+
+ self.assertEqual(expected_life, actual_lifetime)
+
+ if renew_till is None:
+ self.assertIsNone(expected_renew_life)
+ else:
+ if expected_renew_life is not None:
+ actual_renew_till = self.get_EpochFromKerberosTime(
+ renew_till.decode('ascii'))
+ actual_renew_life = actual_renew_till - starttime
+
+ self.assertEqual(expected_renew_life, actual_renew_life)
+
+ def _get_tgt(self, creds, *,
+ armor_tgt=None,
+ till=None,
+ expected_error=0,
+ expect_status=None,
+ expected_status=None):
+ user_name = creds.get_username()
+ realm = creds.get_realm()
+ salt = creds.get_salt()
+
+ cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=user_name.split('/'))
+ sname = self.PrincipalName_create(name_type=NT_SRV_INST,
+ names=['krbtgt', realm])
+ expected_sname = self.PrincipalName_create(
+ name_type=NT_SRV_INST, names=['krbtgt', realm.upper()])
+
+ expected_cname = cname
+
+ if till is None:
+ till = self.get_KerberosTime(offset=36000)
+
+ renew_time = till
+
+ krbtgt_creds = self.get_krbtgt_creds()
+ ticket_decryption_key = (
+ self.TicketDecryptionKey_from_creds(krbtgt_creds))
+
+ expected_etypes = krbtgt_creds.tgs_supported_enctypes
+
+ kdc_options = str(krb5_asn1.KDCOptions('renewable'))
+ # Contrary to Microsoft’s documentation, the returned ticket is
+ # renewable.
+ expected_flags = krb5_asn1.TicketFlags('renewable')
+
+ preauth_key = self.PasswordKey_from_creds(creds,
+ kcrypto.Enctype.AES256)
+
+ expected_realm = realm.upper()
+
+ etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
+
+ if armor_tgt is not None:
+ authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
+ armor_key = self.generate_armor_key(authenticator_subkey,
+ armor_tgt.session_key)
+ armor_subkey = authenticator_subkey
+
+ client_challenge_key = self.generate_client_challenge_key(
+ armor_key, preauth_key)
+ enc_challenge_padata = self.get_challenge_pa_data(
+ client_challenge_key)
+
+ def generate_fast_padata_fn(kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ return [enc_challenge_padata], req_body
+
+ generate_fast_fn = self.generate_simple_fast
+ generate_fast_armor_fn = self.generate_ap_req
+ generate_padata_fn = None
+
+ fast_armor_type = FX_FAST_ARMOR_AP_REQUEST
+ else:
+ ts_enc_padata = self.get_enc_timestamp_pa_data_from_key(
+ preauth_key)
+
+ def generate_padata_fn(kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ return [ts_enc_padata], req_body
+
+ generate_fast_fn = None
+ generate_fast_padata_fn = None
+ generate_fast_armor_fn = None
+
+ armor_key = None
+ armor_subkey = None
+
+ fast_armor_type = None
+
+ if not expected_error:
+ check_error_fn = None
+ check_rep_fn = self.generic_check_kdc_rep
+ else:
+ check_error_fn = self.generic_check_kdc_error
+ check_rep_fn = None
+
+ kdc_exchange_dict = self.as_exchange_dict(
+ creds=creds,
+ expected_error_mode=expected_error,
+ expect_status=expect_status,
+ expected_status=expected_status,
+ expected_crealm=expected_realm,
+ expected_cname=expected_cname,
+ expected_srealm=expected_realm,
+ expected_sname=expected_sname,
+ expected_salt=salt,
+ expected_flags=expected_flags,
+ expected_supported_etypes=expected_etypes,
+ generate_padata_fn=generate_padata_fn,
+ generate_fast_padata_fn=generate_fast_padata_fn,
+ generate_fast_fn=generate_fast_fn,
+ generate_fast_armor_fn=generate_fast_armor_fn,
+ fast_armor_type=fast_armor_type,
+ check_error_fn=check_error_fn,
+ check_rep_fn=check_rep_fn,
+ check_kdc_private_fn=self.generic_check_kdc_private,
+ armor_key=armor_key,
+ armor_tgt=armor_tgt,
+ armor_subkey=armor_subkey,
+ kdc_options=kdc_options,
+ preauth_key=preauth_key,
+ ticket_decryption_key=ticket_decryption_key,
+ # PA-DATA types are not important for these tests.
+ check_patypes=False)
+
+ rep = self._generic_kdc_exchange(kdc_exchange_dict,
+ cname=cname,
+ realm=realm,
+ sname=sname,
+ till_time=till,
+ renew_time=renew_time,
+ etypes=etypes)
+ if expected_error:
+ self.check_error_rep(rep, expected_error)
+
+ return None
+
+ self.check_as_reply(rep)
+
+ ticket_creds = kdc_exchange_dict['rep_ticket_creds']
+ return ticket_creds
+
+
+if __name__ == '__main__':
+ global_asn1_print = False
+ global_hexdump = False
+ import unittest
+ unittest.main()
diff --git a/python/samba/tests/krb5/rfc4120_constants.py b/python/samba/tests/krb5/rfc4120_constants.py
index 7ad336208e2..fd9903001fd 100644
--- a/python/samba/tests/krb5/rfc4120_constants.py
+++ b/python/samba/tests/krb5/rfc4120_constants.py
@@ -84,6 +84,7 @@ PADATA_REQ_ENC_PA_REP = int(
# Error codes
KDC_ERR_C_PRINCIPAL_UNKNOWN = 6
KDC_ERR_S_PRINCIPAL_UNKNOWN = 7
+KDC_ERR_NEVER_VALID = 11
KDC_ERR_POLICY = 12
KDC_ERR_BADOPTION = 13
KDC_ERR_ETYPE_NOSUPP = 14
diff --git a/python/samba/tests/usage.py b/python/samba/tests/usage.py
index a5f97e96afb..cd9b33c5d03 100644
--- a/python/samba/tests/usage.py
+++ b/python/samba/tests/usage.py
@@ -120,6 +120,7 @@ EXCLUDE_USAGE = {
'python/samba/tests/krb5/etype_tests.py',
'python/samba/tests/krb5/device_tests.py',
'python/samba/tests/krb5/claims_in_pac.py',
+ 'python/samba/tests/krb5/authn_policy_tests.py',
}
EXCLUDE_HELP = {