# Unix SMB/CIFS implementation. # Copyright (C) Isaac Boukris 2020 # Copyright (C) Stefan Metzmacher 2020 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys import socket import struct import time import datetime import random import binascii import itertools import collections from enum import Enum from pyasn1.codec.der.decoder import decode as pyasn1_der_decode from pyasn1.codec.der.encoder import encode as pyasn1_der_encode from pyasn1.codec.native.decoder import decode as pyasn1_native_decode from pyasn1.codec.native.encoder import encode as pyasn1_native_encode from pyasn1.codec.ber.encoder import BitStringEncoder from pyasn1.error import PyAsn1Error from samba.credentials import Credentials from samba.dcerpc import claims, krb5pac, netlogon, security from samba.gensec import FEATURE_SEAL from samba.ndr import ndr_pack, ndr_unpack import samba.tests from samba.tests import TestCaseInTempDir import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1 from samba.tests.krb5.rfc4120_constants import ( AD_IF_RELEVANT, AD_WIN2K_PAC, FX_FAST_ARMOR_AP_REQUEST, KDC_ERR_CLIENT_REVOKED, KDC_ERR_GENERIC, KDC_ERR_POLICY, KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS, KERB_ERR_TYPE_EXTENDED, KRB_AP_REP, KRB_AP_REQ, KRB_AS_REP, KRB_AS_REQ, KRB_ERROR, KRB_PRIV, KRB_TGS_REP, KRB_TGS_REQ, KU_AP_REQ_AUTH, KU_AS_REP_ENC_PART, KU_AP_REQ_ENC_PART, KU_AS_REQ, KU_ENC_CHALLENGE_KDC, KU_FAST_ENC, KU_FAST_FINISHED, KU_FAST_REP, KU_FAST_REQ_CHKSUM, KU_KRB_PRIV, KU_NON_KERB_CKSUM_SALT, KU_TGS_REP_ENC_PART_SESSION, KU_TGS_REP_ENC_PART_SUB_KEY, KU_TGS_REQ_AUTH, KU_TGS_REQ_AUTH_CKSUM, KU_TGS_REQ_AUTH_DAT_SESSION, KU_TGS_REQ_AUTH_DAT_SUBKEY, KU_TICKET, NT_PRINCIPAL, NT_SRV_INST, NT_WELLKNOWN, PADATA_ENCRYPTED_CHALLENGE, PADATA_ENC_TIMESTAMP, PADATA_ETYPE_INFO, PADATA_ETYPE_INFO2, PADATA_FOR_USER, PADATA_FX_COOKIE, PADATA_FX_ERROR, PADATA_FX_FAST, PADATA_GSS, PADATA_KDC_REQ, PADATA_PAC_OPTIONS, PADATA_PAC_REQUEST, PADATA_PKINIT_KX, PADATA_PK_AS_REQ, PADATA_PK_AS_REP_19, PADATA_SUPPORTED_ETYPES, PADATA_REQ_ENC_PA_REP ) import samba.tests.krb5.kcrypto as kcrypto from samba.tests.krb5 import xpress def BitStringEncoder_encodeValue32( self, value, asn1Spec, encodeFun, **options): # # BitStrings like KDCOptions or TicketFlags should at least # be 32-Bit on the wire # if asn1Spec is not None: # TODO: try to avoid ASN.1 schema instantiation value = asn1Spec.clone(value) valueLength = len(value) if valueLength % 8: alignedValue = value << (8 - valueLength % 8) else: alignedValue = value substrate = alignedValue.asOctets() length = len(substrate) # We need at least 32-Bit / 4-Bytes if length < 4: padding = 4 - length else: padding = 0 ret = b'\x00' + substrate + (b'\x00' * padding) return ret, False, True BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32 def BitString_NamedValues_prettyPrint(self, scope=0): ret = "%s" % self.asBinary() bits = [] highest_bit = 32 for byte in self.asNumbers(): for bit in [7, 6, 5, 4, 3, 2, 1, 0]: mask = 1 << bit if byte & mask: val = 1 else: val = 0 bits.append(val) if len(bits) < highest_bit: for bitPosition in range(len(bits), highest_bit): bits.append(0) indent = " " * scope delim = ": (\n%s " % indent for bitPosition in range(highest_bit): if bitPosition in self.prettyPrintNamedValues: name = self.prettyPrintNamedValues[bitPosition] elif bits[bitPosition] != 0: name = "unknown-bit-%u" % bitPosition else: continue ret += "%s%s:%u" % (delim, name, bits[bitPosition]) delim = ",\n%s " % indent ret += "\n%s)" % indent return ret krb5_asn1.TicketFlags.prettyPrintNamedValues =\ krb5_asn1.TicketFlagsValues.namedValues krb5_asn1.TicketFlags.namedValues =\ krb5_asn1.TicketFlagsValues.namedValues krb5_asn1.TicketFlags.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.KDCOptions.prettyPrintNamedValues =\ krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.KDCOptions.namedValues =\ krb5_asn1.KDCOptionsValues.namedValues krb5_asn1.KDCOptions.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.APOptions.prettyPrintNamedValues =\ krb5_asn1.APOptionsValues.namedValues krb5_asn1.APOptions.namedValues =\ krb5_asn1.APOptionsValues.namedValues krb5_asn1.APOptions.prettyPrint =\ BitString_NamedValues_prettyPrint krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\ krb5_asn1.PACOptionFlagsValues.namedValues krb5_asn1.PACOptionFlags.namedValues =\ krb5_asn1.PACOptionFlagsValues.namedValues krb5_asn1.PACOptionFlags.prettyPrint =\ BitString_NamedValues_prettyPrint def Integer_NamedValues_prettyPrint(self, scope=0): intval = int(self) if intval in self.prettyPrintNamedValues: name = self.prettyPrintNamedValues[intval] else: name = "<__unknown__>" ret = "%d (0x%x) %s" % (intval, intval, name) return ret krb5_asn1.NameType.prettyPrintNamedValues =\ krb5_asn1.NameTypeValues.namedValues krb5_asn1.NameType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.AuthDataType.prettyPrintNamedValues =\ krb5_asn1.AuthDataTypeValues.namedValues krb5_asn1.AuthDataType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.PADataType.prettyPrintNamedValues =\ krb5_asn1.PADataTypeValues.namedValues krb5_asn1.PADataType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.EncryptionType.prettyPrintNamedValues =\ krb5_asn1.EncryptionTypeValues.namedValues krb5_asn1.EncryptionType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.ChecksumType.prettyPrintNamedValues =\ krb5_asn1.ChecksumTypeValues.namedValues krb5_asn1.ChecksumType.prettyPrint =\ Integer_NamedValues_prettyPrint krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\ krb5_asn1.KerbErrorDataTypeValues.namedValues krb5_asn1.KerbErrorDataType.prettyPrint =\ Integer_NamedValues_prettyPrint class Krb5EncryptionKey: def __init__(self, key, kvno): EncTypeChecksum = { kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256, kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128, kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5, } self.key = key self.etype = key.enctype self.ctype = EncTypeChecksum[self.etype] self.kvno = kvno def __str__(self): return "etype=%d ctype=%d kvno=%d key=%s" % ( self.etype, self.ctype, self.kvno, self.key) def encrypt(self, usage, plaintext): ciphertext = kcrypto.encrypt(self.key, usage, plaintext) return ciphertext def decrypt(self, usage, ciphertext): plaintext = kcrypto.decrypt(self.key, usage, ciphertext) return plaintext def make_zeroed_checksum(self, ctype=None): if ctype is None: ctype = self.ctype checksum_len = kcrypto.checksum_len(ctype) return bytes(checksum_len) def make_checksum(self, usage, plaintext, ctype=None): if ctype is None: ctype = self.ctype cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext) return cksum def verify_checksum(self, usage, plaintext, ctype, cksum): if self.ctype != ctype: raise AssertionError(f'key checksum type ({self.ctype}) != ' f'checksum type ({ctype})') kcrypto.verify_checksum(ctype, self.key, usage, plaintext, cksum) def export_obj(self): EncryptionKey_obj = { 'keytype': self.etype, 'keyvalue': self.key.contents, } return EncryptionKey_obj class RodcPacEncryptionKey(Krb5EncryptionKey): def __init__(self, key, kvno, rodc_id=None): super().__init__(key, kvno) if rodc_id is None: kvno = self.kvno if kvno is not None: kvno >>= 16 kvno &= (1 << 16) - 1 rodc_id = kvno or None if rodc_id is not None: self.rodc_id = rodc_id.to_bytes(2, byteorder='little') else: self.rodc_id = b'' def make_rodc_zeroed_checksum(self, ctype=None): checksum = super().make_zeroed_checksum(ctype) return checksum + bytes(len(self.rodc_id)) def make_rodc_checksum(self, usage, plaintext, ctype=None): checksum = super().make_checksum(usage, plaintext, ctype) return checksum + self.rodc_id def verify_rodc_checksum(self, usage, plaintext, ctype, cksum): if self.rodc_id: cksum, cksum_rodc_id = cksum[:-2], cksum[-2:] if self.rodc_id != cksum_rodc_id: raise AssertionError(f'{self.rodc_id.hex()} != ' f'{cksum_rodc_id.hex()}') super().verify_checksum(usage, plaintext, ctype, cksum) class ZeroedChecksumKey(RodcPacEncryptionKey): def make_checksum(self, usage, plaintext, ctype=None): return self.make_zeroed_checksum(ctype) def make_rodc_checksum(self, usage, plaintext, ctype=None): return self.make_rodc_zeroed_checksum(ctype) class WrongLengthChecksumKey(RodcPacEncryptionKey): def __init__(self, key, kvno, length): super().__init__(key, kvno) self._length = length @classmethod def _adjust_to_length(cls, checksum, length): diff = length - len(checksum) if diff > 0: checksum += bytes(diff) elif diff < 0: checksum = checksum[:length] return checksum def make_zeroed_checksum(self, ctype=None): return bytes(self._length) def make_checksum(self, usage, plaintext, ctype=None): checksum = super().make_checksum(usage, plaintext, ctype) return self._adjust_to_length(checksum, self._length) def make_rodc_zeroed_checksum(self, ctype=None): return bytes(self._length) def make_rodc_checksum(self, usage, plaintext, ctype=None): checksum = super().make_rodc_checksum(usage, plaintext, ctype) return self._adjust_to_length(checksum, self._length) class KerberosCredentials(Credentials): fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED | security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED | security.KERB_ENCTYPE_CLAIMS_SUPPORTED) non_etype_bits = fast_supported_bits | ( security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED) def __init__(self): super(KerberosCredentials, self).__init__() all_enc_types = 0 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96 self.as_supported_enctypes = all_enc_types self.tgs_supported_enctypes = all_enc_types self.ap_supported_enctypes = all_enc_types self.kvno = None self.forced_keys = {} self.forced_salt = None self.dn = None self.upn = None self.spn = None def set_as_supported_enctypes(self, value): self.as_supported_enctypes = int(value) def set_tgs_supported_enctypes(self, value): self.tgs_supported_enctypes = int(value) def set_ap_supported_enctypes(self, value): self.ap_supported_enctypes = int(value) etype_map = collections.OrderedDict([ (kcrypto.Enctype.AES256, security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96), (kcrypto.Enctype.AES128, security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96), (kcrypto.Enctype.RC4, security.KERB_ENCTYPE_RC4_HMAC_MD5), (kcrypto.Enctype.DES_MD5, security.KERB_ENCTYPE_DES_CBC_MD5), (kcrypto.Enctype.DES_CRC, security.KERB_ENCTYPE_DES_CBC_CRC) ]) @classmethod def etypes_to_bits(cls, etypes): bits = 0 for etype in etypes: bit = cls.etype_map[etype] if bits & bit: raise ValueError(f'Got duplicate etype: {etype}') bits |= bit return bits @classmethod def bits_to_etypes(cls, bits): etypes = () for etype, bit in cls.etype_map.items(): if bit & bits: bits &= ~bit etypes += (etype,) bits &= ~cls.non_etype_bits if bits != 0: raise ValueError(f'Unsupported etype bits: {bits}') return etypes def get_as_krb5_etypes(self): return self.bits_to_etypes(self.as_supported_enctypes) def get_tgs_krb5_etypes(self): return self.bits_to_etypes(self.tgs_supported_enctypes) def get_ap_krb5_etypes(self): return self.bits_to_etypes(self.ap_supported_enctypes) def set_kvno(self, kvno): # Sign-extend from 32 bits. if kvno & 1 << 31: kvno |= -1 << 31 self.kvno = kvno def get_kvno(self): return self.kvno def set_forced_key(self, etype, hexkey): etype = int(etype) contents = binascii.a2b_hex(hexkey) key = kcrypto.Key(etype, contents) self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno) def get_forced_key(self, etype): etype = int(etype) return self.forced_keys.get(etype) def set_forced_salt(self, salt): self.forced_salt = bytes(salt) def get_forced_salt(self): return self.forced_salt def get_salt(self): if self.forced_salt is not None: return self.forced_salt upn = self.get_upn() if upn is not None: salt_name = upn.rsplit('@', 1)[0].replace('/', '') else: salt_name = self.get_username() if self.get_workstation(): salt_name = self.get_username().lower() if salt_name[-1] == '$': salt_name = salt_name[:-1] salt_string = '%shost%s.%s' % ( self.get_realm().upper(), salt_name, self.get_realm().lower()) else: salt_string = self.get_realm().upper() + salt_name return salt_string.encode('utf-8') def set_dn(self, dn): self.dn = dn def get_dn(self): return self.dn def set_spn(self, spn): self.spn = spn def get_spn(self): return self.spn def set_upn(self, upn): self.upn = upn def get_upn(self): return self.upn def update_password(self, password): self.set_password(password) self.set_kvno(self.get_kvno() + 1) class KerberosTicketCreds: def __init__(self, ticket, session_key, crealm=None, cname=None, srealm=None, sname=None, decryption_key=None, ticket_private=None, encpart_private=None): self.ticket = ticket self.session_key = session_key self.crealm = crealm self.cname = cname self.srealm = srealm self.sname = sname self.decryption_key = decryption_key self.ticket_private = ticket_private self.encpart_private = encpart_private def set_sname(self, sname): self.ticket['sname'] = sname self.sname = sname class RawKerberosTest(TestCaseInTempDir): """A raw Kerberos Test case.""" class KpasswdMode(Enum): SET = object() CHANGE = object() # The location of a SID within the PAC class SidType(Enum): BASE_SID = object() # in info3.base.groups EXTRA_SID = object() # in info3.sids RESOURCE_SID = object() # in resource_groups pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM, krb5pac.PAC_TYPE_KDC_CHECKSUM, krb5pac.PAC_TYPE_TICKET_CHECKSUM, krb5pac.PAC_TYPE_FULL_CHECKSUM} etypes_to_test = ( {"value": -1111, "name": "dummy", }, {"value": kcrypto.Enctype.AES256, "name": "aes128", }, {"value": kcrypto.Enctype.AES128, "name": "aes256", }, {"value": kcrypto.Enctype.RC4, "name": "rc4", }, ) setup_etype_test_permutations_done = False @classmethod def setup_etype_test_permutations(cls): if cls.setup_etype_test_permutations_done: return res = [] num_idxs = len(cls.etypes_to_test) permutations = [] for num in range(1, num_idxs + 1): chunk = list(itertools.permutations(range(num_idxs), num)) for e in chunk: el = list(e) permutations.append(el) for p in permutations: name = None etypes = () for idx in p: n = cls.etypes_to_test[idx]["name"] if name is None: name = n else: name += "_%s" % n etypes += (cls.etypes_to_test[idx]["value"],) r = {"name": name, "etypes": etypes, } res.append(r) cls.etype_test_permutations = res cls.setup_etype_test_permutations_done = True @classmethod def etype_test_permutation_name_idx(cls): cls.setup_etype_test_permutations() res = [] idx = 0 for e in cls.etype_test_permutations: r = (e['name'], idx) idx += 1 res.append(r) return res def etype_test_permutation_by_idx(self, idx): e = self.etype_test_permutations[idx] return (e['name'], e['etypes']) @classmethod def setUpClass(cls): super().setUpClass() cls.host = samba.tests.env_get_var_value('SERVER') cls.dc_host = samba.tests.env_get_var_value('DC_SERVER') # A dictionary containing credentials that have already been # obtained. cls.creds_dict = {} kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT', allow_missing=True) if kdc_fast_support is None: kdc_fast_support = '0' cls.kdc_fast_support = bool(int(kdc_fast_support)) kdc_claims_support = samba.tests.env_get_var_value('CLAIMS_SUPPORT', allow_missing=True) if kdc_claims_support is None: kdc_claims_support = '0' cls.kdc_claims_support = bool(int(kdc_claims_support)) kdc_compound_id_support = samba.tests.env_get_var_value( 'COMPOUND_ID_SUPPORT', allow_missing=True) if kdc_compound_id_support is None: kdc_compound_id_support = '0' cls.kdc_compound_id_support = bool(int(kdc_compound_id_support)) tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT', allow_missing=True) if tkt_sig_support is None: tkt_sig_support = '0' cls.tkt_sig_support = bool(int(tkt_sig_support)) full_sig_support = samba.tests.env_get_var_value('FULL_SIG_SUPPORT', allow_missing=True) if full_sig_support is None: full_sig_support = '0' cls.full_sig_support = bool(int(full_sig_support)) gnutls_pbkdf2_support = samba.tests.env_get_var_value( 'GNUTLS_PBKDF2_SUPPORT', allow_missing=True) if gnutls_pbkdf2_support is None: gnutls_pbkdf2_support = '1' cls.gnutls_pbkdf2_support = bool(int(gnutls_pbkdf2_support)) expect_pac = samba.tests.env_get_var_value('EXPECT_PAC', allow_missing=True) if expect_pac is None: expect_pac = '1' cls.expect_pac = bool(int(expect_pac)) expect_extra_pac_buffers = samba.tests.env_get_var_value( 'EXPECT_EXTRA_PAC_BUFFERS', allow_missing=True) if expect_extra_pac_buffers is None: expect_extra_pac_buffers = '1' cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers)) cname_checking = samba.tests.env_get_var_value('CHECK_CNAME', allow_missing=True) if cname_checking is None: cname_checking = '1' cls.cname_checking = bool(int(cname_checking)) padata_checking = samba.tests.env_get_var_value('CHECK_PADATA', allow_missing=True) if padata_checking is None: padata_checking = '1' cls.padata_checking = bool(int(padata_checking)) kadmin_is_tgs = samba.tests.env_get_var_value('KADMIN_IS_TGS', allow_missing=True) if kadmin_is_tgs is None: kadmin_is_tgs = '0' cls.kadmin_is_tgs = bool(int(kadmin_is_tgs)) def setUp(self): super().setUp() self.do_asn1_print = False self.do_hexdump = False strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING', allow_missing=True) if strict_checking is None: strict_checking = '1' self.strict_checking = bool(int(strict_checking)) self.s = None self.unspecified_kvno = object() def tearDown(self): self._disconnect("tearDown") super().tearDown() def _disconnect(self, reason): if self.s is None: return self.s.close() self.s = None if self.do_hexdump: sys.stderr.write("disconnect[%s]\n" % reason) def _connect_tcp(self, host, port=None): if port is None: port = 88 try: self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM, socket.SOL_TCP, 0) self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2]) self.s.settimeout(10) self.s.connect(self.a[0][4]) except socket.error: self.s.close() raise except IOError: self.s.close() raise def connect(self, host, port=None): self.assertNotConnected() self._connect_tcp(host, port) if self.do_hexdump: sys.stderr.write("connected[%s]\n" % host) def env_get_var(self, varname, prefix, fallback_default=True, allow_missing=False): val = None if prefix is not None: allow_missing_prefix = allow_missing or fallback_default val = samba.tests.env_get_var_value( '%s_%s' % (prefix, varname), allow_missing=allow_missing_prefix) else: fallback_default = True if val is None and fallback_default: val = samba.tests.env_get_var_value(varname, allow_missing=allow_missing) return val def _get_krb5_creds_from_env(self, prefix, default_username=None, allow_missing_password=False, allow_missing_keys=True, require_strongest_key=False): c = KerberosCredentials() c.guess() domain = self.env_get_var('DOMAIN', prefix) realm = self.env_get_var('REALM', prefix) allow_missing_username = default_username is not None username = self.env_get_var('USERNAME', prefix, fallback_default=False, allow_missing=allow_missing_username) if username is None: username = default_username password = self.env_get_var('PASSWORD', prefix, fallback_default=False, allow_missing=allow_missing_password) c.set_domain(domain) c.set_realm(realm) c.set_username(username) if password is not None: c.set_password(password) as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if as_supported_enctypes is not None: c.set_as_supported_enctypes(as_supported_enctypes) tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if tgs_supported_enctypes is not None: c.set_tgs_supported_enctypes(tgs_supported_enctypes) ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES', prefix, allow_missing=True) if ap_supported_enctypes is not None: c.set_ap_supported_enctypes(ap_supported_enctypes) if require_strongest_key: kvno_allow_missing = False if password is None: aes256_allow_missing = False else: aes256_allow_missing = True else: kvno_allow_missing = allow_missing_keys aes256_allow_missing = allow_missing_keys kvno = self.env_get_var('KVNO', prefix, fallback_default=False, allow_missing=kvno_allow_missing) if kvno is not None: c.set_kvno(int(kvno)) aes256_key = self.env_get_var('AES256_KEY_HEX', prefix, fallback_default=False, allow_missing=aes256_allow_missing) if aes256_key is not None: c.set_forced_key(kcrypto.Enctype.AES256, aes256_key) aes128_key = self.env_get_var('AES128_KEY_HEX', prefix, fallback_default=False, allow_missing=True) if aes128_key is not None: c.set_forced_key(kcrypto.Enctype.AES128, aes128_key) rc4_key = self.env_get_var('RC4_KEY_HEX', prefix, fallback_default=False, allow_missing=True) if rc4_key is not None: c.set_forced_key(kcrypto.Enctype.RC4, rc4_key) if not allow_missing_keys: self.assertTrue(c.forced_keys, 'Please supply %s encryption keys ' 'in environment' % prefix) return c def _get_krb5_creds(self, prefix, default_username=None, allow_missing_password=False, allow_missing_keys=True, require_strongest_key=False, fallback_creds_fn=None): if prefix in self.creds_dict: return self.creds_dict[prefix] # We don't have the credentials already creds = None env_err = None try: # Try to obtain them from the environment creds = self._get_krb5_creds_from_env( prefix, default_username=default_username, allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys, require_strongest_key=require_strongest_key) except Exception as err: # An error occurred, so save it for later env_err = err else: self.assertIsNotNone(creds) # Save the obtained credentials self.creds_dict[prefix] = creds return creds if fallback_creds_fn is not None: try: # Try to use the fallback method creds = fallback_creds_fn() except Exception as err: print("ERROR FROM ENV: %r" % (env_err)) print("FALLBACK-FN: %s" % (fallback_creds_fn)) print("FALLBACK-ERROR: %r" % (err)) else: self.assertIsNotNone(creds) # Save the obtained credentials self.creds_dict[prefix] = creds return creds # Both methods failed, so raise the exception from the # environment method raise env_err def get_user_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix=None, allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_service_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='SERVICE', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_client_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='CLIENT', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_server_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='SERVER', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) return c def get_admin_creds(self, allow_missing_password=False, allow_missing_keys=True): c = self._get_krb5_creds(prefix='ADMIN', allow_missing_password=allow_missing_password, allow_missing_keys=allow_missing_keys) c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL) c.set_workstation('') return c def get_rodc_krbtgt_creds(self, require_keys=True, require_strongest_key=False): if require_strongest_key: self.assertTrue(require_keys) c = self._get_krb5_creds(prefix='RODC_KRBTGT', allow_missing_password=True, allow_missing_keys=not require_keys, require_strongest_key=require_strongest_key) return c def get_krbtgt_creds(self, require_keys=True, require_strongest_key=False): if require_strongest_key: self.assertTrue(require_keys) c = self._get_krb5_creds(prefix='KRBTGT', default_username='krbtgt', allow_missing_password=True, allow_missing_keys=not require_keys, require_strongest_key=require_strongest_key) return c def get_anon_creds(self): c = Credentials() c.set_anonymous() return c def asn1_dump(self, name, obj, asn1_print=None): if asn1_print is None: asn1_print = self.do_asn1_print if asn1_print: if name is not None: sys.stderr.write("%s:\n%s" % (name, obj)) else: sys.stderr.write("%s" % (obj)) def hex_dump(self, name, blob, hexdump=None): if hexdump is None: hexdump = self.do_hexdump if hexdump: sys.stderr.write( "%s: %d\n%s" % (name, len(blob), self.hexdump(blob))) def der_decode( self, blob, asn1Spec=None, native_encode=True, asn1_print=None, hexdump=None): if asn1Spec is not None: class_name = type(asn1Spec).__name__.split(':')[0] else: class_name = "" self.hex_dump(class_name, blob, hexdump=hexdump) obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec) self.asn1_dump(None, obj, asn1_print=asn1_print) if native_encode: obj = pyasn1_native_encode(obj) return obj def der_encode( self, obj, asn1Spec=None, native_decode=True, asn1_print=None, hexdump=None): if native_decode: obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec) class_name = type(obj).__name__.split(':')[0] if class_name is not None: self.asn1_dump(None, obj, asn1_print=asn1_print) blob = pyasn1_der_encode(obj) if class_name is not None: self.hex_dump(class_name, blob, hexdump=hexdump) return blob def send_pdu(self, req, asn1_print=None, hexdump=None): k5_pdu = self.der_encode( req, native_decode=False, asn1_print=asn1_print, hexdump=False) self.send_msg(k5_pdu, hexdump=hexdump) def send_msg(self, msg, hexdump=None): header = struct.pack('>I', len(msg)) req_pdu = header req_pdu += msg self.hex_dump("send_msg", header, hexdump=hexdump) self.hex_dump("send_msg", msg, hexdump=hexdump) try: while True: sent = self.s.send(req_pdu, 0) if sent == len(req_pdu): return req_pdu = req_pdu[sent:] except socket.error as e: self._disconnect("send_msg: %s" % e) raise except IOError as e: self._disconnect("send_msg: %s" % e) raise def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None): rep_pdu = None try: if timeout is not None: self.s.settimeout(timeout) rep_pdu = self.s.recv(num_recv, 0) self.s.settimeout(10) if len(rep_pdu) == 0: self._disconnect("recv_raw: EOF") return None self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump) except socket.timeout: self.s.settimeout(10) sys.stderr.write("recv_raw: TIMEOUT\n") except socket.error as e: self._disconnect("recv_raw: %s" % e) raise except IOError as e: self._disconnect("recv_raw: %s" % e) raise return rep_pdu def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None): raw_pdu = self.recv_raw( num_recv=4, hexdump=hexdump, timeout=timeout) if raw_pdu is None: return None header = struct.unpack(">I", raw_pdu[0:4]) k5_len = header[0] if k5_len == 0: return "" missing = k5_len rep_pdu = b'' while missing > 0: raw_pdu = self.recv_raw( num_recv=missing, hexdump=hexdump, timeout=timeout) self.assertGreaterEqual(len(raw_pdu), 1) rep_pdu += raw_pdu missing = k5_len - len(rep_pdu) return rep_pdu def recv_reply(self, asn1_print=None, hexdump=None, timeout=None): rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) if not rep_pdu: return None, rep_pdu k5_raw = self.der_decode( rep_pdu, asn1Spec=None, native_encode=False, asn1_print=False, hexdump=False) pvno = k5_raw['field-0'] self.assertEqual(pvno, 5) msg_type = k5_raw['field-1'] self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR]) if msg_type == KRB_AS_REP: asn1Spec = krb5_asn1.AS_REP() elif msg_type == KRB_TGS_REP: asn1Spec = krb5_asn1.TGS_REP() elif msg_type == KRB_ERROR: asn1Spec = krb5_asn1.KRB_ERROR() rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec, asn1_print=asn1_print, hexdump=False) return (rep, rep_pdu) def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None): (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) return rep def assertIsConnected(self): self.assertIsNotNone(self.s, msg="Not connected") def assertNotConnected(self): self.assertIsNone(self.s, msg="Is connected") def send_recv_transaction( self, req, asn1_print=None, hexdump=None, timeout=None, to_rodc=False): host = self.host if to_rodc else self.dc_host self.connect(host) try: self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump) rep = self.recv_pdu( asn1_print=asn1_print, hexdump=hexdump, timeout=timeout) except Exception: self._disconnect("transaction failed") raise self._disconnect("transaction done") return rep def assertNoValue(self, value): self.assertTrue(value.isNoValue) def assertHasValue(self, value): self.assertIsNotNone(value) def getElementValue(self, obj, elem): return obj.get(elem) def assertElementMissing(self, obj, elem): v = self.getElementValue(obj, elem) self.assertIsNone(v) def assertElementPresent(self, obj, elem, expect_empty=False): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) if self.strict_checking: if isinstance(v, collections.abc.Container): if expect_empty: self.assertEqual(0, len(v)) else: self.assertNotEqual(0, len(v)) def assertElementEqual(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) self.assertEqual(v, value) def assertElementEqualUTF8(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) self.assertEqual(v, bytes(value, 'utf8')) def assertPrincipalEqual(self, princ1, princ2): self.assertEqual(princ1['name-type'], princ2['name-type']) self.assertEqual( len(princ1['name-string']), len(princ2['name-string']), msg="princ1=%s != princ2=%s" % (princ1, princ2)) for idx in range(len(princ1['name-string'])): self.assertEqual( princ1['name-string'][idx], princ2['name-string'][idx], msg="princ1=%s != princ2=%s" % (princ1, princ2)) def assertElementEqualPrincipal(self, obj, elem, value): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName()) self.assertPrincipalEqual(v, value) def assertElementKVNO(self, obj, elem, value): v = self.getElementValue(obj, elem) if value == "autodetect": value = v if value is not None: self.assertIsNotNone(v) # The value on the wire should never be 0 self.assertNotEqual(v, 0) # unspecified_kvno means we don't know the kvno, # but want to enforce its presence if value is not self.unspecified_kvno: value = int(value) self.assertNotEqual(value, 0) self.assertEqual(v, value) else: self.assertIsNone(v) def assertElementFlags(self, obj, elem, expected, unexpected): v = self.getElementValue(obj, elem) self.assertIsNotNone(v) if expected is not None: self.assertIsInstance(expected, krb5_asn1.TicketFlags) for i, flag in enumerate(expected): if flag == 1: self.assertEqual('1', v[i], f"'{expected.namedValues[i]}' " f"expected in {v}") if unexpected is not None: self.assertIsInstance(unexpected, krb5_asn1.TicketFlags) for i, flag in enumerate(unexpected): if flag == 1: self.assertEqual('0', v[i], f"'{unexpected.namedValues[i]}' " f"unexpected in {v}") def assertSequenceElementsEqual(self, expected, got, *, require_strict=None, unchecked=None, require_ordered=True): if self.strict_checking and require_ordered and not unchecked: self.assertEqual(expected, got) else: fail_msg = f'expected: {expected} got: {got}' ignored = set() if unchecked: ignored.update(unchecked) if require_strict and not self.strict_checking: ignored.update(require_strict) if ignored: fail_msg += f' (ignoring: {ignored})' expected = (x for x in expected if x not in ignored) got = (x for x in got if x not in ignored) self.assertCountEqual(expected, got, fail_msg) def get_KerberosTimeWithUsec(self, epoch=None, offset=None): if epoch is None: epoch = time.time() if offset is not None: epoch = epoch + int(offset) dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc) return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond) def get_KerberosTime(self, epoch=None, offset=None): (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset) return s def get_EpochFromKerberosTime(self, kerberos_time): if isinstance(kerberos_time, bytes): kerberos_time = kerberos_time.decode() epoch = datetime.datetime.strptime(kerberos_time, '%Y%m%d%H%M%SZ') epoch = epoch.replace(tzinfo=datetime.timezone.utc) epoch = int(epoch.timestamp()) return epoch def get_Nonce(self): nonce_min = 0x7f000000 nonce_max = 0x7fffffff v = random.randint(nonce_min, nonce_max) return v def get_pa_dict(self, pa_data): pa_dict = {} if pa_data is not None: for pa in pa_data: pa_type = pa['padata-type'] if pa_type in pa_dict: raise RuntimeError(f'Duplicate type {pa_type}') pa_dict[pa_type] = pa['padata-value'] return pa_dict def SessionKey_create(self, etype, contents, kvno=None): key = kcrypto.Key(etype, contents) return RodcPacEncryptionKey(key, kvno) def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None, params=None): self.assertIsNotNone(pwd) self.assertIsNotNone(salt) key = kcrypto.string_to_key(etype, pwd, salt, params=params) return RodcPacEncryptionKey(key, kvno) def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None): e = etype_info2['etype'] salt = etype_info2.get('salt') params = etype_info2.get('s2kparams') return self.PasswordKey_from_etype(creds, e, kvno=kvno, salt=salt, params=params) def PasswordKey_from_creds(self, creds, etype): kvno = creds.get_kvno() salt = creds.get_salt() return self.PasswordKey_from_etype(creds, etype, kvno=kvno, salt=salt) def PasswordKey_from_etype(self, creds, etype, kvno=None, salt=None, params=None): if etype == kcrypto.Enctype.RC4: nthash = creds.get_nt_hash() return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) password = creds.get_password().encode('utf-8') return self.PasswordKey_create( etype=etype, pwd=password, salt=salt, kvno=kvno) def TicketDecryptionKey_from_creds(self, creds, etype=None): if etype is None: etypes = creds.get_tgs_krb5_etypes() if etypes: etype = etypes[0] else: etype = kcrypto.Enctype.RC4 forced_key = creds.get_forced_key(etype) if forced_key is not None: return forced_key kvno = creds.get_kvno() fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] " "nor a password specified, " % ( creds.get_username(), etype, kvno)) if etype == kcrypto.Enctype.RC4: nthash = creds.get_nt_hash() self.assertIsNotNone(nthash, msg=fail_msg) return self.SessionKey_create(etype=etype, contents=nthash, kvno=kvno) password = creds.get_password() self.assertIsNotNone(password, msg=fail_msg) salt = creds.get_salt() return self.PasswordKey_create(etype=etype, pwd=password, salt=salt, kvno=kvno) def RandomKey(self, etype): e = kcrypto._get_enctype_profile(etype) contents = samba.generate_random_bytes(e.keysize) return self.SessionKey_create(etype=etype, contents=contents) def EncryptionKey_import(self, EncryptionKey_obj): return self.SessionKey_create(EncryptionKey_obj['keytype'], EncryptionKey_obj['keyvalue']) def EncryptedData_create(self, key, usage, plaintext): # EncryptedData ::= SEQUENCE { # etype [0] Int32 -- EncryptionType --, # kvno [1] Int32 OPTIONAL, # cipher [2] OCTET STRING -- ciphertext # } ciphertext = key.encrypt(usage, plaintext) EncryptedData_obj = { 'etype': key.etype, 'cipher': ciphertext } if key.kvno is not None: EncryptedData_obj['kvno'] = key.kvno return EncryptedData_obj def Checksum_create(self, key, usage, plaintext, ctype=None): # Checksum ::= SEQUENCE { # cksumtype [0] Int32, # checksum [1] OCTET STRING # } if ctype is None: ctype = key.ctype checksum = key.make_checksum(usage, plaintext, ctype=ctype) Checksum_obj = { 'cksumtype': ctype, 'checksum': checksum, } return Checksum_obj @classmethod def PrincipalName_create(cls, name_type, names): # PrincipalName ::= SEQUENCE { # name-type [0] Int32, # name-string [1] SEQUENCE OF KerberosString # } PrincipalName_obj = { 'name-type': name_type, 'name-string': names, } return PrincipalName_obj def AuthorizationData_create(self, ad_type, ad_data): # AuthorizationData ::= SEQUENCE { # ad-type [0] Int32, # ad-data [1] OCTET STRING # } AUTH_DATA_obj = { 'ad-type': ad_type, 'ad-data': ad_data } return AUTH_DATA_obj def PA_DATA_create(self, padata_type, padata_value): # PA-DATA ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # padata-type [1] Int32, # padata-value [2] OCTET STRING -- might be encoded AP-REQ # } PA_DATA_obj = { 'padata-type': padata_type, 'padata-value': padata_value, } return PA_DATA_obj def PA_ENC_TS_ENC_create(self, ts, usec): # PA-ENC-TS-ENC ::= SEQUENCE { # patimestamp[0] KerberosTime, -- client's time # pausec[1] krb5int32 OPTIONAL # } PA_ENC_TS_ENC_obj = { 'patimestamp': ts, 'pausec': usec, } return PA_ENC_TS_ENC_obj def PA_PAC_OPTIONS_create(self, options): # PA-PAC-OPTIONS ::= SEQUENCE { # options [0] PACOptionFlags # } PA_PAC_OPTIONS_obj = { 'options': options } return PA_PAC_OPTIONS_obj def KRB_FAST_ARMOR_create(self, armor_type, armor_value): # KrbFastArmor ::= SEQUENCE { # armor-type [0] Int32, # armor-value [1] OCTET STRING, # ... # } KRB_FAST_ARMOR_obj = { 'armor-type': armor_type, 'armor-value': armor_value } return KRB_FAST_ARMOR_obj def KRB_FAST_REQ_create(self, fast_options, padata, req_body): # KrbFastReq ::= SEQUENCE { # fast-options [0] FastOptions, # padata [1] SEQUENCE OF PA-DATA, # req-body [2] KDC-REQ-BODY, # ... # } KRB_FAST_REQ_obj = { 'fast-options': fast_options, 'padata': padata, 'req-body': req_body } return KRB_FAST_REQ_obj def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req): # KrbFastArmoredReq ::= SEQUENCE { # armor [0] KrbFastArmor OPTIONAL, # req-checksum [1] Checksum, # enc-fast-req [2] EncryptedData -- KrbFastReq -- # } KRB_FAST_ARMORED_REQ_obj = { 'req-checksum': req_checksum, 'enc-fast-req': enc_fast_req } if armor is not None: KRB_FAST_ARMORED_REQ_obj['armor'] = armor return KRB_FAST_ARMORED_REQ_obj def PA_FX_FAST_REQUEST_create(self, armored_data): # PA-FX-FAST-REQUEST ::= CHOICE { # armored-data [0] KrbFastArmoredReq, # ... # } PA_FX_FAST_REQUEST_obj = { 'armored-data': armored_data } return PA_FX_FAST_REQUEST_obj def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True): # KERB-PA-PAC-REQUEST ::= SEQUENCE { # include-pac[0] BOOLEAN --If TRUE, and no pac present, # -- include PAC. # --If FALSE, and PAC present, # -- remove PAC. # } KERB_PA_PAC_REQUEST_obj = { 'include-pac': include_pac, } if not pa_data_create: return KERB_PA_PAC_REQUEST_obj pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj, asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST()) pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac) return pa_data def get_pa_pac_options(self, options): pac_options = self.PA_PAC_OPTIONS_create(options) pac_options = self.der_encode(pac_options, asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options) return pac_options def KDC_REQ_BODY_create(self, kdc_options, cname, realm, sname, from_time, till_time, renew_time, nonce, etypes, addresses, additional_tickets, EncAuthorizationData, EncAuthorizationData_key, EncAuthorizationData_usage, asn1_print=None, hexdump=None): # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } if EncAuthorizationData is not None: enc_ad_plain = self.der_encode( EncAuthorizationData, asn1Spec=krb5_asn1.AuthorizationData(), asn1_print=asn1_print, hexdump=hexdump) enc_ad = self.EncryptedData_create(EncAuthorizationData_key, EncAuthorizationData_usage, enc_ad_plain) else: enc_ad = None KDC_REQ_BODY_obj = { 'kdc-options': kdc_options, 'realm': realm, 'till': till_time, 'nonce': nonce, 'etype': etypes, } if cname is not None: KDC_REQ_BODY_obj['cname'] = cname if sname is not None: KDC_REQ_BODY_obj['sname'] = sname if from_time is not None: KDC_REQ_BODY_obj['from'] = from_time if renew_time is not None: KDC_REQ_BODY_obj['rtime'] = renew_time if addresses is not None: KDC_REQ_BODY_obj['addresses'] = addresses if enc_ad is not None: KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad if additional_tickets is not None: KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets return KDC_REQ_BODY_obj def KDC_REQ_create(self, msg_type, padata, req_body, asn1Spec=None, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # KDC_REQ_obj = { 'pvno': 5, 'msg-type': msg_type, 'req-body': req_body, } if padata is not None: KDC_REQ_obj['padata'] = padata if asn1Spec is not None: KDC_REQ_decoded = pyasn1_native_decode( KDC_REQ_obj, asn1Spec=asn1Spec) else: KDC_REQ_decoded = None return KDC_REQ_obj, KDC_REQ_decoded def AS_REQ_create(self, padata, # optional kdc_options, # required cname, # optional realm, # required sname, # optional from_time, # optional till_time, # required renew_time, # optional nonce, # required etypes, # required addresses, # optional additional_tickets, native_decoded_only=True, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create( kdc_options, cname, realm, sname, from_time, till_time, renew_time, nonce, etypes, addresses, additional_tickets, EncAuthorizationData=None, EncAuthorizationData_key=None, EncAuthorizationData_usage=None, asn1_print=asn1_print, hexdump=hexdump) obj, decoded = self.KDC_REQ_create( msg_type=KRB_AS_REQ, padata=padata, req_body=KDC_REQ_BODY_obj, asn1Spec=krb5_asn1.AS_REQ(), asn1_print=asn1_print, hexdump=hexdump) if native_decoded_only: return decoded return decoded, obj def AP_REQ_create(self, ap_options, ticket, authenticator): # AP-REQ ::= [APPLICATION 14] SEQUENCE { # pvno [0] INTEGER (5), # msg-type [1] INTEGER (14), # ap-options [2] APOptions, # ticket [3] Ticket, # authenticator [4] EncryptedData -- Authenticator # } AP_REQ_obj = { 'pvno': 5, 'msg-type': KRB_AP_REQ, 'ap-options': ap_options, 'ticket': ticket, 'authenticator': authenticator, } return AP_REQ_obj def Authenticator_create( self, crealm, cname, cksum, cusec, ctime, subkey, seq_number, authorization_data): # -- Unencrypted authenticator # Authenticator ::= [APPLICATION 2] SEQUENCE { # authenticator-vno [0] INTEGER (5), # crealm [1] Realm, # cname [2] PrincipalName, # cksum [3] Checksum OPTIONAL, # cusec [4] Microseconds, # ctime [5] KerberosTime, # subkey [6] EncryptionKey OPTIONAL, # seq-number [7] UInt32 OPTIONAL, # authorization-data [8] AuthorizationData OPTIONAL # } Authenticator_obj = { 'authenticator-vno': 5, 'crealm': crealm, 'cname': cname, 'cusec': cusec, 'ctime': ctime, } if cksum is not None: Authenticator_obj['cksum'] = cksum if subkey is not None: Authenticator_obj['subkey'] = subkey if seq_number is not None: Authenticator_obj['seq-number'] = seq_number if authorization_data is not None: Authenticator_obj['authorization-data'] = authorization_data return Authenticator_obj def TGS_REQ_create(self, padata, # optional cusec, ctime, ticket, kdc_options, # required cname, # optional realm, # required sname, # optional from_time, # optional till_time, # required renew_time, # optional nonce, # required etypes, # required addresses, # optional EncAuthorizationData, EncAuthorizationData_key, additional_tickets, ticket_session_key, authenticator_subkey=None, body_checksum_type=None, native_decoded_only=True, asn1_print=None, hexdump=None): # KDC-REQ ::= SEQUENCE { # -- NOTE: first tag is [1], not [0] # pvno [1] INTEGER (5) , # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --), # padata [3] SEQUENCE OF PA-DATA OPTIONAL # -- NOTE: not empty --, # req-body [4] KDC-REQ-BODY # } # # KDC-REQ-BODY ::= SEQUENCE { # kdc-options [0] KDCOptions, # cname [1] PrincipalName OPTIONAL # -- Used only in AS-REQ --, # realm [2] Realm # -- Server's realm # -- Also client's in AS-REQ --, # sname [3] PrincipalName OPTIONAL, # from [4] KerberosTime OPTIONAL, # till [5] KerberosTime, # rtime [6] KerberosTime OPTIONAL, # nonce [7] UInt32, # etype [8] SEQUENCE OF Int32 # -- EncryptionType # -- in preference order --, # addresses [9] HostAddresses OPTIONAL, # enc-authorization-data [10] EncryptedData OPTIONAL # -- AuthorizationData --, # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL # -- NOTE: not empty # } if authenticator_subkey is not None: EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY else: EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION req_body = self.KDC_REQ_BODY_create( kdc_options=kdc_options, cname=None, realm=realm, sname=sname, from_time=from_time, till_time=till_time, renew_time=renew_time, nonce=nonce, etypes=etypes, addresses=addresses, additional_tickets=additional_tickets, EncAuthorizationData=EncAuthorizationData, EncAuthorizationData_key=EncAuthorizationData_key, EncAuthorizationData_usage=EncAuthorizationData_usage) req_body_blob = self.der_encode(req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY(), asn1_print=asn1_print, hexdump=hexdump) req_body_checksum = self.Checksum_create(ticket_session_key, KU_TGS_REQ_AUTH_CKSUM, req_body_blob, ctype=body_checksum_type) subkey_obj = None if authenticator_subkey is not None: subkey_obj = authenticator_subkey.export_obj() seq_number = random.randint(0, 0xfffffffe) authenticator = self.Authenticator_create( crealm=realm, cname=cname, cksum=req_body_checksum, cusec=cusec, ctime=ctime, subkey=subkey_obj, seq_number=seq_number, authorization_data=None) authenticator = self.der_encode( authenticator, asn1Spec=krb5_asn1.Authenticator(), asn1_print=asn1_print, hexdump=hexdump) authenticator = self.EncryptedData_create( ticket_session_key, KU_TGS_REQ_AUTH, authenticator) ap_options = krb5_asn1.APOptions('0') ap_req = self.AP_REQ_create(ap_options=str(ap_options), ticket=ticket, authenticator=authenticator) ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(), asn1_print=asn1_print, hexdump=hexdump) pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) if padata is not None: padata.append(pa_tgs_req) else: padata = [pa_tgs_req] obj, decoded = self.KDC_REQ_create( msg_type=KRB_TGS_REQ, padata=padata, req_body=req_body, asn1Spec=krb5_asn1.TGS_REQ(), asn1_print=asn1_print, hexdump=hexdump) if native_decoded_only: return decoded return decoded, obj def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None): # PA-S4U2Self ::= SEQUENCE { # name [0] PrincipalName, # realm [1] Realm, # cksum [2] Checksum, # auth [3] GeneralString # } cksum_data = name['name-type'].to_bytes(4, byteorder='little') for n in name['name-string']: cksum_data += n.encode() cksum_data += realm.encode() cksum_data += "Kerberos".encode() cksum = self.Checksum_create(tgt_session_key, KU_NON_KERB_CKSUM_SALT, cksum_data, ctype) PA_S4U2Self_obj = { 'name': name, 'realm': realm, 'cksum': cksum, 'auth': "Kerberos", } pa_s4u2self = self.der_encode( PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self()) return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self) def ChangePasswdDataMS_create(self, new_password, target_princ=None, target_realm=None): ChangePasswdDataMS_obj = { 'newpasswd': new_password, } if target_princ is not None: ChangePasswdDataMS_obj['targname'] = target_princ if target_realm is not None: ChangePasswdDataMS_obj['targrealm'] = target_realm change_password_data = self.der_encode( ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS()) return change_password_data def KRB_PRIV_create(self, subkey, user_data, s_address, timestamp=None, usec=None, seq_number=None, r_address=None): EncKrbPrivPart_obj = { 'user-data': user_data, 's-address': s_address, } if timestamp is not None: EncKrbPrivPart_obj['timestamp'] = timestamp if usec is not None: EncKrbPrivPart_obj['usec'] = usec if seq_number is not None: EncKrbPrivPart_obj['seq-number'] = seq_number if r_address is not None: EncKrbPrivPart_obj['r-address'] = r_address enc_krb_priv_part = self.der_encode( EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart()) enc_data = self.EncryptedData_create(subkey, KU_KRB_PRIV, enc_krb_priv_part) KRB_PRIV_obj = { 'pvno': 5, 'msg-type': KRB_PRIV, 'enc-part': enc_data, } krb_priv = self.der_encode( KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV()) return krb_priv def kpasswd_create(self, subkey, user_data, version, seq_number, ap_req, local_address, remote_address): self.assertIsNotNone(self.s, 'call self.connect() first') timestamp, usec = self.get_KerberosTimeWithUsec() krb_priv = self.KRB_PRIV_create(subkey, user_data, s_address=local_address, timestamp=timestamp, usec=usec, seq_number=seq_number, r_address=remote_address) size = 6 + len(ap_req) + len(krb_priv) self.assertLess(size, 0x10000) msg = bytearray() msg.append(size >> 8) msg.append(size & 0xff) msg.append(version >> 8) msg.append(version & 0xff) msg.append(len(ap_req) >> 8) msg.append(len(ap_req) & 0xff) # Note: for sets, there could be a little-endian four-byte length here. msg.extend(ap_req) msg.extend(krb_priv) return msg def get_enc_part(self, obj, key, usage): self.assertElementEqual(obj, 'pvno', 5) enc_part = obj['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(usage, enc_part['cipher']) return enc_part def kpasswd_exchange(self, ticket, new_password, expected_code, expected_msg, mode, target_princ=None, target_realm=None, ap_options=None, send_seq_number=True): if mode is self.KpasswdMode.SET: version = 0xff80 user_data = self.ChangePasswdDataMS_create(new_password, target_princ, target_realm) elif mode is self.KpasswdMode.CHANGE: self.assertIsNone(target_princ, 'target_princ only valid for pw set') self.assertIsNone(target_realm, 'target_realm only valid for pw set') version = 1 user_data = new_password.encode('utf-8') else: self.fail(f'invalid mode {mode}') subkey = self.RandomKey(kcrypto.Enctype.AES256) if ap_options is None: ap_options = '0' ap_options = str(krb5_asn1.APOptions(ap_options)) kdc_exchange_dict = { 'tgt': ticket, 'authenticator_subkey': subkey, 'auth_data': None, 'ap_options': ap_options, } if send_seq_number: seq_number = random.randint(0, 0xfffffffe) else: seq_number = None ap_req = self.generate_ap_req(kdc_exchange_dict, None, req_body=None, armor=False, usage=KU_AP_REQ_AUTH, seq_number=seq_number) self.connect(self.host, port=464) self.assertIsNotNone(self.s) family = self.s.family if family == socket.AF_INET: addr_type = 2 # IPv4 elif family == socket.AF_INET6: addr_type = 24 # IPv6 else: self.fail(f'unknown family {family}') def create_address(ip): return { 'addr-type': addr_type, 'address': socket.inet_pton(family, ip), } local_ip = self.s.getsockname()[0] local_address = create_address(local_ip) # remote_ip = self.s.getpeername()[0] # remote_address = create_address(remote_ip) # TODO: due to a bug (?), MIT Kerberos will not accept the request # unless r-address is set to our _local_ address. Heimdal, on the other # hand, requires the r-address is set to the remote address (as # expected). To avoid problems, avoid sending r-address for now. remote_address = None msg = self.kpasswd_create(subkey, user_data, version, seq_number, ap_req, local_address, remote_address) self.send_msg(msg) rep_pdu = self.recv_pdu_raw() self._disconnect('transaction done') self.assertIsNotNone(rep_pdu) header = rep_pdu[:6] reply = rep_pdu[6:] reply_len = (header[0] << 8) | header[1] reply_version = (header[2] << 8) | header[3] ap_rep_len = (header[4] << 8) | header[5] self.assertEqual(reply_len, len(rep_pdu)) self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW self.assertLess(ap_rep_len, reply_len) self.assertNotEqual(0x7e, rep_pdu[1]) self.assertNotEqual(0x5e, rep_pdu[1]) if ap_rep_len: # We received an AP-REQ and KRB-PRIV as a response. This may or may # not indicate an error, depending on the status code. ap_rep = reply[:ap_rep_len] krb_priv = reply[ap_rep_len:] key = ticket.session_key ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP()) self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP) enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncAPRepPart()) self.assertElementPresent(enc_part, 'ctime') self.assertElementPresent(enc_part, 'cusec') # self.assertElementMissing(enc_part, 'subkey') # TODO # self.assertElementPresent(enc_part, 'seq-number') # TODO try: krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV()) except PyAsn1Error: self.fail() self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV) priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV) priv_enc_part = self.der_decode( priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart()) self.assertElementMissing(priv_enc_part, 'timestamp') self.assertElementMissing(priv_enc_part, 'usec') # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO # self.assertElementMissing(priv_enc_part, 'r-address') # TODO result_data = priv_enc_part['user-data'] else: # We received a KRB-ERROR as a response, indicating an error. krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR()) sname = self.PrincipalName_create( name_type=NT_PRINCIPAL, names=['kadmin', 'changepw']) realm = self.get_krbtgt_creds().get_realm().upper() self.assertElementEqual(krb_error, 'pvno', 5) self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR) self.assertElementMissing(krb_error, 'ctime') self.assertElementMissing(krb_error, 'usec') self.assertElementPresent(krb_error, 'stime') self.assertElementPresent(krb_error, 'susec') error_code = krb_error['error-code'] if isinstance(expected_code, int): self.assertEqual(error_code, expected_code) else: self.assertIn(error_code, expected_code) self.assertElementMissing(krb_error, 'crealm') self.assertElementMissing(krb_error, 'cname') self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8')) self.assertElementEqualPrincipal(krb_error, 'sname', sname) self.assertElementMissing(krb_error, 'e-text') result_data = krb_error['e-data'] status = result_data[:2] message = result_data[2:] status_code = (status[0] << 8) | status[1] if isinstance(expected_code, int): self.assertEqual(status_code, expected_code) else: self.assertIn(status_code, expected_code) if not message: self.assertEqual(0, status_code, 'got an error result, but no message') return # Check the first character of the message. if message[0]: if isinstance(expected_msg, bytes): self.assertEqual(message, expected_msg) else: self.assertIn(message, expected_msg) else: # We got AD password policy information. self.assertEqual(30, len(message)) (empty_bytes, min_length, history_length, properties, expire_time, min_age) = struct.unpack('>HIIIQQ', message) def _generic_kdc_exchange(self, kdc_exchange_dict, # required cname=None, # optional realm=None, # required sname=None, # optional from_time=None, # optional till_time=None, # required renew_time=None, # optional etypes=None, # required addresses=None, # optional additional_tickets=None, # optional EncAuthorizationData=None, # optional EncAuthorizationData_key=None, # optional EncAuthorizationData_usage=None): # optional check_error_fn = kdc_exchange_dict['check_error_fn'] check_rep_fn = kdc_exchange_dict['check_rep_fn'] generate_fast_fn = kdc_exchange_dict['generate_fast_fn'] generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn'] generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn'] generate_padata_fn = kdc_exchange_dict['generate_padata_fn'] callback_dict = kdc_exchange_dict['callback_dict'] req_msg_type = kdc_exchange_dict['req_msg_type'] req_asn1Spec = kdc_exchange_dict['req_asn1Spec'] rep_msg_type = kdc_exchange_dict['rep_msg_type'] expected_error_mode = kdc_exchange_dict['expected_error_mode'] kdc_options = kdc_exchange_dict['kdc_options'] pac_request = kdc_exchange_dict['pac_request'] pac_options = kdc_exchange_dict['pac_options'] # Parameters specific to the inner request body inner_req = kdc_exchange_dict['inner_req'] # Parameters specific to the outer request body outer_req = kdc_exchange_dict['outer_req'] if till_time is None: till_time = self.get_KerberosTime(offset=36000) if 'nonce' in kdc_exchange_dict: nonce = kdc_exchange_dict['nonce'] else: nonce = self.get_Nonce() kdc_exchange_dict['nonce'] = nonce req_body = self.KDC_REQ_BODY_create( kdc_options=kdc_options, cname=cname, realm=realm, sname=sname, from_time=from_time, till_time=till_time, renew_time=renew_time, nonce=nonce, etypes=etypes, addresses=addresses, additional_tickets=additional_tickets, EncAuthorizationData=EncAuthorizationData, EncAuthorizationData_key=EncAuthorizationData_key, EncAuthorizationData_usage=EncAuthorizationData_usage) inner_req_body = dict(req_body) if inner_req is not None: for key, value in inner_req.items(): if value is not None: inner_req_body[key] = value else: del inner_req_body[key] if outer_req is not None: for key, value in outer_req.items(): if value is not None: req_body[key] = value else: del req_body[key] additional_padata = [] if pac_request is not None: pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request) additional_padata.append(pa_pac_request) if pac_options is not None: pa_pac_options = self.get_pa_pac_options(pac_options) additional_padata.append(pa_pac_options) if req_msg_type == KRB_AS_REQ: tgs_req = None tgs_req_padata = None else: self.assertEqual(KRB_TGS_REQ, req_msg_type) tgs_req = self.generate_ap_req(kdc_exchange_dict, callback_dict, req_body, armor=False) tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req) if generate_fast_padata_fn is not None: self.assertIsNotNone(generate_fast_fn) # This can alter req_body... fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict, callback_dict, req_body) else: fast_padata = [] if generate_fast_armor_fn is not None: self.assertIsNotNone(generate_fast_fn) fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict, callback_dict, None, armor=True) fast_armor_type = kdc_exchange_dict['fast_armor_type'] fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type, fast_ap_req) else: fast_armor = None if generate_padata_fn is not None: # This can alter req_body... outer_padata, req_body = generate_padata_fn(kdc_exchange_dict, callback_dict, req_body) self.assertIsNotNone(outer_padata) self.assertNotIn(PADATA_KDC_REQ, [pa['padata-type'] for pa in outer_padata], 'Don\'t create TGS-REQ manually') else: outer_padata = None if generate_fast_fn is not None: armor_key = kdc_exchange_dict['armor_key'] self.assertIsNotNone(armor_key) if req_msg_type == KRB_AS_REQ: checksum_blob = self.der_encode( req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) else: self.assertEqual(KRB_TGS_REQ, req_msg_type) checksum_blob = tgs_req checksum = self.Checksum_create(armor_key, KU_FAST_REQ_CHKSUM, checksum_blob) fast_padata += additional_padata fast = generate_fast_fn(kdc_exchange_dict, callback_dict, inner_req_body, fast_padata, fast_armor, checksum) else: fast = None padata = [] if tgs_req_padata is not None: padata.append(tgs_req_padata) if fast is not None: padata.append(fast) if outer_padata is not None: padata += outer_padata if fast is None: padata += additional_padata if not padata: padata = None kdc_exchange_dict['req_padata'] = padata kdc_exchange_dict['fast_padata'] = fast_padata kdc_exchange_dict['req_body'] = inner_req_body req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type, padata=padata, req_body=req_body, asn1Spec=req_asn1Spec()) kdc_exchange_dict['req_obj'] = req_obj to_rodc = kdc_exchange_dict['to_rodc'] rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc) self.assertIsNotNone(rep) msg_type = self.getElementValue(rep, 'msg-type') self.assertIsNotNone(msg_type) expected_msg_type = None if check_error_fn is not None: expected_msg_type = KRB_ERROR self.assertIsNone(check_rep_fn) self.assertNotEqual(0, len(expected_error_mode)) self.assertNotIn(0, expected_error_mode) if check_rep_fn is not None: expected_msg_type = rep_msg_type self.assertIsNone(check_error_fn) self.assertEqual(0, len(expected_error_mode)) self.assertIsNotNone(expected_msg_type) if msg_type == KRB_ERROR: error_code = self.getElementValue(rep, 'error-code') fail_msg = f'Got unexpected error: {error_code}' else: fail_msg = f'Expected to fail with error: {expected_error_mode}' self.assertEqual(msg_type, expected_msg_type, fail_msg) if msg_type == KRB_ERROR: return check_error_fn(kdc_exchange_dict, callback_dict, rep) return check_rep_fn(kdc_exchange_dict, callback_dict, rep) def as_exchange_dict(self, expected_crealm=None, expected_cname=None, expected_anon=False, expected_srealm=None, expected_sname=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_domain_sid=None, expected_supported_etypes=None, expected_flags=None, unexpected_flags=None, ticket_decryption_key=None, expect_ticket_checksum=None, expect_full_checksum=None, generate_fast_fn=None, generate_fast_armor_fn=None, generate_fast_padata_fn=None, fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, generate_padata_fn=None, check_error_fn=None, check_rep_fn=None, check_kdc_private_fn=None, callback_dict=None, expected_error_mode=0, expected_status=None, client_as_etypes=None, expected_salt=None, authenticator_subkey=None, preauth_key=None, armor_key=None, armor_tgt=None, armor_subkey=None, auth_data=None, kdc_options='', inner_req=None, outer_req=None, pac_request=None, pac_options=None, ap_options=None, fast_ap_options=None, strict_edata_checking=True, expect_edata=None, expect_pac=True, expect_client_claims=None, expect_device_info=None, expect_device_claims=None, expect_upn_dns_info_ex=None, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, rc4_support=True, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, to_rodc=False): if expected_error_mode == 0: expected_error_mode = () elif not isinstance(expected_error_mode, collections.abc.Container): expected_error_mode = (expected_error_mode,) kdc_exchange_dict = { 'req_msg_type': KRB_AS_REQ, 'req_asn1Spec': krb5_asn1.AS_REQ, 'rep_msg_type': KRB_AS_REP, 'rep_asn1Spec': krb5_asn1.AS_REP, 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart, 'expected_crealm': expected_crealm, 'expected_cname': expected_cname, 'expected_anon': expected_anon, 'expected_srealm': expected_srealm, 'expected_sname': expected_sname, 'expected_account_name': expected_account_name, 'expected_groups': expected_groups, 'unexpected_groups': unexpected_groups, 'expected_upn_name': expected_upn_name, 'expected_sid': expected_sid, 'expected_domain_sid': expected_domain_sid, 'expected_supported_etypes': expected_supported_etypes, 'expected_flags': expected_flags, 'unexpected_flags': unexpected_flags, 'ticket_decryption_key': ticket_decryption_key, 'expect_ticket_checksum': expect_ticket_checksum, 'expect_full_checksum': expect_full_checksum, 'generate_fast_fn': generate_fast_fn, 'generate_fast_armor_fn': generate_fast_armor_fn, 'generate_fast_padata_fn': generate_fast_padata_fn, 'fast_armor_type': fast_armor_type, 'generate_padata_fn': generate_padata_fn, 'check_error_fn': check_error_fn, 'check_rep_fn': check_rep_fn, 'check_kdc_private_fn': check_kdc_private_fn, 'callback_dict': callback_dict, 'expected_error_mode': expected_error_mode, 'expected_status': expected_status, 'client_as_etypes': client_as_etypes, 'expected_salt': expected_salt, 'authenticator_subkey': authenticator_subkey, 'preauth_key': preauth_key, 'armor_key': armor_key, 'armor_tgt': armor_tgt, 'armor_subkey': armor_subkey, 'auth_data': auth_data, 'kdc_options': kdc_options, 'inner_req': inner_req, 'outer_req': outer_req, 'pac_request': pac_request, 'pac_options': pac_options, 'ap_options': ap_options, 'fast_ap_options': fast_ap_options, 'strict_edata_checking': strict_edata_checking, 'expect_edata': expect_edata, 'expect_pac': expect_pac, 'expect_client_claims': expect_client_claims, 'expect_device_info': expect_device_info, 'expect_device_claims': expect_device_claims, 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, 'expect_pac_attrs': expect_pac_attrs, 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, 'expect_requester_sid': expect_requester_sid, 'rc4_support': rc4_support, 'expected_client_claims': expected_client_claims, 'unexpected_client_claims': unexpected_client_claims, 'expected_device_claims': expected_device_claims, 'unexpected_device_claims': unexpected_device_claims, 'to_rodc': to_rodc } if callback_dict is None: callback_dict = {} return kdc_exchange_dict def tgs_exchange_dict(self, expected_crealm=None, expected_cname=None, expected_anon=False, expected_srealm=None, expected_sname=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_domain_sid=None, expected_supported_etypes=None, expected_flags=None, unexpected_flags=None, ticket_decryption_key=None, expect_ticket_checksum=None, expect_full_checksum=None, generate_fast_fn=None, generate_fast_armor_fn=None, generate_fast_padata_fn=None, fast_armor_type=FX_FAST_ARMOR_AP_REQUEST, generate_padata_fn=None, check_error_fn=None, check_rep_fn=None, check_kdc_private_fn=None, expected_error_mode=0, expected_status=None, callback_dict=None, tgt=None, armor_key=None, armor_tgt=None, armor_subkey=None, authenticator_subkey=None, auth_data=None, body_checksum_type=None, kdc_options='', inner_req=None, outer_req=None, pac_request=None, pac_options=None, ap_options=None, fast_ap_options=None, strict_edata_checking=True, expect_edata=None, expect_pac=True, expect_client_claims=None, expect_device_info=None, expect_device_claims=None, expect_upn_dns_info_ex=None, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, expected_proxy_target=None, expected_transited_services=None, rc4_support=True, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, to_rodc=False): if expected_error_mode == 0: expected_error_mode = () elif not isinstance(expected_error_mode, collections.abc.Container): expected_error_mode = (expected_error_mode,) kdc_exchange_dict = { 'req_msg_type': KRB_TGS_REQ, 'req_asn1Spec': krb5_asn1.TGS_REQ, 'rep_msg_type': KRB_TGS_REP, 'rep_asn1Spec': krb5_asn1.TGS_REP, 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart, 'expected_crealm': expected_crealm, 'expected_cname': expected_cname, 'expected_anon': expected_anon, 'expected_srealm': expected_srealm, 'expected_sname': expected_sname, 'expected_account_name': expected_account_name, 'expected_groups': expected_groups, 'unexpected_groups': unexpected_groups, 'expected_upn_name': expected_upn_name, 'expected_sid': expected_sid, 'expected_domain_sid': expected_domain_sid, 'expected_supported_etypes': expected_supported_etypes, 'expected_flags': expected_flags, 'unexpected_flags': unexpected_flags, 'ticket_decryption_key': ticket_decryption_key, 'expect_ticket_checksum': expect_ticket_checksum, 'expect_full_checksum': expect_full_checksum, 'generate_fast_fn': generate_fast_fn, 'generate_fast_armor_fn': generate_fast_armor_fn, 'generate_fast_padata_fn': generate_fast_padata_fn, 'fast_armor_type': fast_armor_type, 'generate_padata_fn': generate_padata_fn, 'check_error_fn': check_error_fn, 'check_rep_fn': check_rep_fn, 'check_kdc_private_fn': check_kdc_private_fn, 'callback_dict': callback_dict, 'expected_error_mode': expected_error_mode, 'expected_status': expected_status, 'tgt': tgt, 'body_checksum_type': body_checksum_type, 'armor_key': armor_key, 'armor_tgt': armor_tgt, 'armor_subkey': armor_subkey, 'auth_data': auth_data, 'authenticator_subkey': authenticator_subkey, 'kdc_options': kdc_options, 'inner_req': inner_req, 'outer_req': outer_req, 'pac_request': pac_request, 'pac_options': pac_options, 'ap_options': ap_options, 'fast_ap_options': fast_ap_options, 'strict_edata_checking': strict_edata_checking, 'expect_edata': expect_edata, 'expect_pac': expect_pac, 'expect_client_claims': expect_client_claims, 'expect_device_info': expect_device_info, 'expect_device_claims': expect_device_claims, 'expect_upn_dns_info_ex': expect_upn_dns_info_ex, 'expect_pac_attrs': expect_pac_attrs, 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request, 'expect_requester_sid': expect_requester_sid, 'expected_proxy_target': expected_proxy_target, 'expected_transited_services': expected_transited_services, 'rc4_support': rc4_support, 'expected_client_claims': expected_client_claims, 'unexpected_client_claims': unexpected_client_claims, 'expected_device_claims': expected_device_claims, 'unexpected_device_claims': unexpected_device_claims, 'to_rodc': to_rodc } if callback_dict is None: callback_dict = {} return kdc_exchange_dict def generic_check_kdc_rep(self, kdc_exchange_dict, callback_dict, rep): expected_crealm = kdc_exchange_dict['expected_crealm'] expected_anon = kdc_exchange_dict['expected_anon'] expected_srealm = kdc_exchange_dict['expected_srealm'] expected_sname = kdc_exchange_dict['expected_sname'] ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn'] rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec'] msg_type = kdc_exchange_dict['rep_msg_type'] armor_key = kdc_exchange_dict['armor_key'] self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP padata = self.getElementValue(rep, 'padata') if self.strict_checking: self.assertElementEqualUTF8(rep, 'crealm', expected_crealm) if self.cname_checking: if expected_anon: expected_cname = self.PrincipalName_create( name_type=NT_WELLKNOWN, names=['WELLKNOWN', 'ANONYMOUS']) else: expected_cname = kdc_exchange_dict['expected_cname'] self.assertElementEqualPrincipal(rep, 'cname', expected_cname) self.assertElementPresent(rep, 'ticket') ticket = self.getElementValue(rep, 'ticket') ticket_encpart = None ticket_cipher = None self.assertIsNotNone(ticket) if ticket is not None: # Never None, but gives indentation self.assertElementEqual(ticket, 'tkt-vno', 5) self.assertElementEqualUTF8(ticket, 'realm', expected_srealm) self.assertElementEqualPrincipal(ticket, 'sname', expected_sname) self.assertElementPresent(ticket, 'enc-part') ticket_encpart = self.getElementValue(ticket, 'enc-part') self.assertIsNotNone(ticket_encpart) if ticket_encpart is not None: # Never None, but gives indentation self.assertElementPresent(ticket_encpart, 'etype') kdc_options = kdc_exchange_dict['kdc_options'] pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1 expect_kvno = (pos >= len(kdc_options) or kdc_options[pos] != '1') if expect_kvno: # 'unspecified' means present, with any value != 0 self.assertElementKVNO(ticket_encpart, 'kvno', self.unspecified_kvno) else: # For user-to-user, don't expect a kvno. self.assertElementMissing(ticket_encpart, 'kvno') self.assertElementPresent(ticket_encpart, 'cipher') ticket_cipher = self.getElementValue(ticket_encpart, 'cipher') self.assertElementPresent(rep, 'enc-part') encpart = self.getElementValue(rep, 'enc-part') encpart_cipher = None self.assertIsNotNone(encpart) if encpart is not None: # Never None, but gives indentation self.assertElementPresent(encpart, 'etype') self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect') self.assertElementPresent(encpart, 'cipher') encpart_cipher = self.getElementValue(encpart, 'cipher') if self.padata_checking: self.check_reply_padata(kdc_exchange_dict, callback_dict, encpart, padata) ticket_checksum = None # Get the decryption key for the encrypted part encpart_decryption_key, encpart_decryption_usage = ( self.get_preauth_key(kdc_exchange_dict)) if armor_key is not None: pa_dict = self.get_pa_dict(padata) if PADATA_FX_FAST in pa_dict: fx_fast_data = pa_dict[PADATA_FX_FAST] fast_response = self.check_fx_fast_data(kdc_exchange_dict, fx_fast_data, armor_key, finished=True) if 'strengthen-key' in fast_response: strengthen_key = self.EncryptionKey_import( fast_response['strengthen-key']) encpart_decryption_key = ( self.generate_strengthen_reply_key( strengthen_key, encpart_decryption_key)) fast_finished = fast_response.get('finished') if fast_finished is not None: ticket_checksum = fast_finished['ticket-checksum'] self.check_rep_padata(kdc_exchange_dict, callback_dict, fast_response['padata'], error_code=0) ticket_private = None if ticket_decryption_key is not None: self.assertElementEqual(ticket_encpart, 'etype', ticket_decryption_key.etype) self.assertElementKVNO(ticket_encpart, 'kvno', ticket_decryption_key.kvno) ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET, ticket_cipher) ticket_private = self.der_decode( ticket_decpart, asn1Spec=krb5_asn1.EncTicketPart()) encpart_private = None self.assertIsNotNone(encpart_decryption_key) if encpart_decryption_key is not None: self.assertElementEqual(encpart, 'etype', encpart_decryption_key.etype) if self.strict_checking: self.assertElementKVNO(encpart, 'kvno', encpart_decryption_key.kvno) rep_decpart = encpart_decryption_key.decrypt( encpart_decryption_usage, encpart_cipher) # MIT KDC encodes both EncASRepPart and EncTGSRepPart with # application tag 26 try: encpart_private = self.der_decode( rep_decpart, asn1Spec=rep_encpart_asn1Spec()) except Exception: encpart_private = self.der_decode( rep_decpart, asn1Spec=krb5_asn1.EncTGSRepPart()) kdc_exchange_dict['reply_key'] = encpart_decryption_key self.assertIsNotNone(check_kdc_private_fn) if check_kdc_private_fn is not None: check_kdc_private_fn(kdc_exchange_dict, callback_dict, rep, ticket_private, encpart_private, ticket_checksum) return rep def check_fx_fast_data(self, kdc_exchange_dict, fx_fast_data, armor_key, finished=False, expect_strengthen_key=True): fx_fast_data = self.der_decode(fx_fast_data, asn1Spec=krb5_asn1.PA_FX_FAST_REPLY()) enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep'] self.assertEqual(enc_fast_rep['etype'], armor_key.etype) fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher']) fast_response = self.der_decode(fast_rep, asn1Spec=krb5_asn1.KrbFastResponse()) if expect_strengthen_key and self.strict_checking: self.assertIn('strengthen-key', fast_response) if finished: self.assertIn('finished', fast_response) # Ensure that the nonce matches the nonce in the body of the request # (RFC6113 5.4.3). nonce = kdc_exchange_dict['nonce'] self.assertEqual(nonce, fast_response['nonce']) return fast_response def generic_check_kdc_private(self, kdc_exchange_dict, callback_dict, rep, ticket_private, encpart_private, ticket_checksum): kdc_options = kdc_exchange_dict['kdc_options'] canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1 canonicalize = (canon_pos < len(kdc_options) and kdc_options[canon_pos] == '1') renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1 renewable = (renewable_pos < len(kdc_options) and kdc_options[renewable_pos] == '1') renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1 renew = (renew_pos < len(kdc_options) and kdc_options[renew_pos] == '1') expect_renew_till = renewable or renew expected_crealm = kdc_exchange_dict['expected_crealm'] expected_cname = kdc_exchange_dict['expected_cname'] expected_srealm = kdc_exchange_dict['expected_srealm'] expected_sname = kdc_exchange_dict['expected_sname'] ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key'] rep_msg_type = kdc_exchange_dict['rep_msg_type'] expected_flags = kdc_exchange_dict.get('expected_flags') unexpected_flags = kdc_exchange_dict.get('unexpected_flags') ticket = self.getElementValue(rep, 'ticket') if ticket_checksum is not None: armor_key = kdc_exchange_dict['armor_key'] self.verify_ticket_checksum(ticket, ticket_checksum, armor_key) to_rodc = kdc_exchange_dict['to_rodc'] if to_rodc: krbtgt_creds = self.get_rodc_krbtgt_creds() else: krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) krbtgt_keys = [krbtgt_key] if not self.strict_checking: krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds( krbtgt_creds, etype=kcrypto.Enctype.RC4) krbtgt_keys.append(krbtgt_key_rc4) if self.expect_pac and self.is_tgs(expected_sname): expect_pac = True else: expect_pac = kdc_exchange_dict['expect_pac'] ticket_session_key = None if ticket_private is not None: self.assertElementFlags(ticket_private, 'flags', expected_flags, unexpected_flags) self.assertElementPresent(ticket_private, 'key') ticket_key = self.getElementValue(ticket_private, 'key') self.assertIsNotNone(ticket_key) if ticket_key is not None: # Never None, but gives indentation self.assertElementPresent(ticket_key, 'keytype') self.assertElementPresent(ticket_key, 'keyvalue') ticket_session_key = self.EncryptionKey_import(ticket_key) self.assertElementEqualUTF8(ticket_private, 'crealm', expected_crealm) if self.cname_checking: self.assertElementEqualPrincipal(ticket_private, 'cname', expected_cname) self.assertElementPresent(ticket_private, 'transited') self.assertElementPresent(ticket_private, 'authtime') if self.strict_checking: self.assertElementPresent(ticket_private, 'starttime') self.assertElementPresent(ticket_private, 'endtime') if self.strict_checking: if expect_renew_till: self.assertElementPresent(ticket_private, 'renew-till') else: self.assertElementMissing(ticket_private, 'renew-till') if self.strict_checking: self.assertElementEqual(ticket_private, 'caddr', []) if expect_pac is not None: if expect_pac: self.assertElementPresent(ticket_private, 'authorization-data', expect_empty=not expect_pac) else: # It is more correct to not have an authorization-data # present than an empty one. # # https://github.com/krb5/krb5/pull/1225#issuecomment-995104193 v = self.getElementValue(ticket_private, 'authorization-data') if v is not None: self.assertElementPresent(ticket_private, 'authorization-data', expect_empty=True) encpart_session_key = None if encpart_private is not None: self.assertElementPresent(encpart_private, 'key') encpart_key = self.getElementValue(encpart_private, 'key') self.assertIsNotNone(encpart_key) if encpart_key is not None: # Never None, but gives indentation self.assertElementPresent(encpart_key, 'keytype') self.assertElementPresent(encpart_key, 'keyvalue') encpart_session_key = self.EncryptionKey_import(encpart_key) self.assertElementPresent(encpart_private, 'last-req') self.assertElementEqual(encpart_private, 'nonce', kdc_exchange_dict['nonce']) if rep_msg_type == KRB_AS_REP: if self.strict_checking: self.assertElementPresent(encpart_private, 'key-expiration') else: self.assertElementMissing(encpart_private, 'key-expiration') self.assertElementFlags(encpart_private, 'flags', expected_flags, unexpected_flags) self.assertElementPresent(encpart_private, 'authtime') if self.strict_checking: self.assertElementPresent(encpart_private, 'starttime') self.assertElementPresent(encpart_private, 'endtime') if self.strict_checking: if expect_renew_till: self.assertElementPresent(encpart_private, 'renew-till') else: self.assertElementMissing(encpart_private, 'renew-till') self.assertElementEqualUTF8(encpart_private, 'srealm', expected_srealm) self.assertElementEqualPrincipal(encpart_private, 'sname', expected_sname) if self.strict_checking: self.assertElementEqual(encpart_private, 'caddr', []) sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) sent_enc_pa_rep = self.sent_enc_pa_rep(kdc_exchange_dict) enc_padata = self.getElementValue(encpart_private, 'encrypted-pa-data') if (canonicalize or '1' in sent_pac_options or ( rep_msg_type == KRB_AS_REP and sent_enc_pa_rep)): if self.strict_checking: self.assertIsNotNone(enc_padata) if enc_padata is not None: enc_pa_dict = self.get_pa_dict(enc_padata) if self.strict_checking: if canonicalize: self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict) else: self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict) if '1' in sent_pac_options: self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict) else: self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict) if rep_msg_type == KRB_AS_REP and sent_enc_pa_rep: self.assertIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) else: self.assertNotIn(PADATA_REQ_ENC_PA_REP, enc_pa_dict) if PADATA_SUPPORTED_ETYPES in enc_pa_dict: expected_supported_etypes = kdc_exchange_dict[ 'expected_supported_etypes'] expected_supported_etypes |= ( security.KERB_ENCTYPE_DES_CBC_CRC | security.KERB_ENCTYPE_DES_CBC_MD5 | security.KERB_ENCTYPE_RC4_HMAC_MD5) (supported_etypes,) = struct.unpack( ' expected_aes_type: expected_aes_type = etype if etype in (kcrypto.Enctype.RC4,) and error_code != 0: if etype > expected_rc4_type and rc4_support: expected_rc4_type = etype if expected_aes_type != 0: expect_etype_info2 += (expected_aes_type,) if expected_rc4_type != 0: expect_etype_info2 += (expected_rc4_type,) expected_patypes = () if sent_fast and error_code != 0: expected_patypes += (PADATA_FX_ERROR,) expected_patypes += (PADATA_FX_COOKIE,) if rep_msg_type == KRB_TGS_REP: sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict) if ('1' in sent_pac_options and error_code not in (0, KDC_ERR_GENERIC)): expected_patypes += (PADATA_PAC_OPTIONS,) elif error_code != KDC_ERR_GENERIC: if expect_etype_info: if rc4_support: self.assertGreater(len(expect_etype_info2), 0) expected_patypes += (PADATA_ETYPE_INFO,) if len(expect_etype_info2) != 0: expected_patypes += (PADATA_ETYPE_INFO2,) if error_code not in (KDC_ERR_PREAUTH_FAILED, KDC_ERR_SKEW, KDC_ERR_POLICY, KDC_ERR_CLIENT_REVOKED): if sent_fast: expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,) else: expected_patypes += (PADATA_ENC_TIMESTAMP,) if not sent_enc_challenge: expected_patypes += (PADATA_PK_AS_REQ,) expected_patypes += (PADATA_PK_AS_REP_19,) if (self.kdc_fast_support and not sent_fast and not sent_enc_challenge): expected_patypes += (PADATA_FX_FAST,) expected_patypes += (PADATA_FX_COOKIE,) require_strict = {PADATA_FX_COOKIE, PADATA_FX_FAST, PADATA_PAC_OPTIONS, PADATA_PK_AS_REP_19, PADATA_PK_AS_REQ, PADATA_PKINIT_KX, PADATA_GSS} strict_edata_checking = kdc_exchange_dict['strict_edata_checking'] if not strict_edata_checking: require_strict.add(PADATA_ETYPE_INFO2) require_strict.add(PADATA_ENCRYPTED_CHALLENGE) got_patypes = tuple(pa['padata-type'] for pa in rep_padata) self.assertSequenceElementsEqual(expected_patypes, got_patypes, require_strict=require_strict) if not expected_patypes: return None pa_dict = self.get_pa_dict(rep_padata) enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP) if enc_timestamp is not None: self.assertEqual(len(enc_timestamp), 0) pk_as_req = pa_dict.get(PADATA_PK_AS_REQ) if pk_as_req is not None: self.assertEqual(len(pk_as_req), 0) pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19) if pk_as_rep19 is not None: self.assertEqual(len(pk_as_rep19), 0) fx_fast = pa_dict.get(PADATA_FX_FAST) if fx_fast is not None: self.assertEqual(len(fx_fast), 0) fast_cookie = pa_dict.get(PADATA_FX_COOKIE) if fast_cookie is not None: kdc_exchange_dict['fast_cookie'] = fast_cookie fast_error = pa_dict.get(PADATA_FX_ERROR) if fast_error is not None: fast_error = self.der_decode(fast_error, asn1Spec=krb5_asn1.KRB_ERROR()) self.generic_check_kdc_error(kdc_exchange_dict, callback_dict, fast_error, inner=True) pac_options = pa_dict.get(PADATA_PAC_OPTIONS) if pac_options is not None: pac_options = self.der_decode( pac_options, asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) self.assertElementEqual(pac_options, 'options', sent_pac_options) enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE) if enc_challenge is not None: if not sent_enc_challenge: self.assertEqual(len(enc_challenge), 0) else: armor_key = kdc_exchange_dict['armor_key'] self.assertIsNotNone(armor_key) preauth_key, _ = self.get_preauth_key(kdc_exchange_dict) kdc_challenge_key = self.generate_kdc_challenge_key( armor_key, preauth_key) # Ensure that the encrypted challenge FAST factor is supported # (RFC6113 5.4.6). if self.strict_checking: self.assertNotEqual(len(enc_challenge), 0) if len(enc_challenge) != 0: encrypted_challenge = self.der_decode( enc_challenge, asn1Spec=krb5_asn1.EncryptedData()) self.assertEqual(encrypted_challenge['etype'], kdc_challenge_key.etype) challenge = kdc_challenge_key.decrypt( KU_ENC_CHALLENGE_KDC, encrypted_challenge['cipher']) challenge = self.der_decode( challenge, asn1Spec=krb5_asn1.PA_ENC_TS_ENC()) # Retrieve the returned timestamp. rep_patime = challenge['patimestamp'] self.assertIn('pausec', challenge) # Ensure the returned time is within five minutes of the # current time. rep_time = self.get_EpochFromKerberosTime(rep_patime) current_time = time.time() self.assertLess(current_time - 300, rep_time) self.assertLess(rep_time, current_time + 300) etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2) if etype_info2 is not None: etype_info2 = self.der_decode(etype_info2, asn1Spec=krb5_asn1.ETYPE_INFO2()) self.assertGreaterEqual(len(etype_info2), 1) if self.strict_checking: self.assertEqual(len(etype_info2), len(expect_etype_info2)) for i in range(0, len(etype_info2)): e = self.getElementValue(etype_info2[i], 'etype') if self.strict_checking: self.assertEqual(e, expect_etype_info2[i]) salt = self.getElementValue(etype_info2[i], 'salt') if e == kcrypto.Enctype.RC4: if self.strict_checking: self.assertIsNone(salt) else: self.assertIsNotNone(salt) expected_salt = kdc_exchange_dict['expected_salt'] if expected_salt is not None: self.assertEqual(salt, expected_salt) s2kparams = self.getElementValue(etype_info2[i], 's2kparams') if self.strict_checking: self.assertIsNone(s2kparams) etype_info = pa_dict.get(PADATA_ETYPE_INFO) if etype_info is not None: etype_info = self.der_decode(etype_info, asn1Spec=krb5_asn1.ETYPE_INFO()) self.assertEqual(len(etype_info), 1) e = self.getElementValue(etype_info[0], 'etype') self.assertEqual(e, kcrypto.Enctype.RC4) if rc4_support: self.assertEqual(e, expect_etype_info2[0]) salt = self.getElementValue(etype_info[0], 'salt') if self.strict_checking: self.assertIsNotNone(salt) self.assertEqual(len(salt), 0) return etype_info2 def generate_simple_fast(self, kdc_exchange_dict, _callback_dict, req_body, fast_padata, fast_armor, checksum, fast_options=''): armor_key = kdc_exchange_dict['armor_key'] fast_req = self.KRB_FAST_REQ_create(fast_options, fast_padata, req_body) fast_req = self.der_encode(fast_req, asn1Spec=krb5_asn1.KrbFastReq()) fast_req = self.EncryptedData_create(armor_key, KU_FAST_ENC, fast_req) fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor, checksum, fast_req) fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req) fx_fast_request = self.der_encode( fx_fast_request, asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST()) fast_padata = self.PA_DATA_create(PADATA_FX_FAST, fx_fast_request) return fast_padata def generate_ap_req(self, kdc_exchange_dict, _callback_dict, req_body, armor, usage=None, seq_number=None): req_body_checksum = None if armor: self.assertIsNone(req_body) tgt = kdc_exchange_dict['armor_tgt'] authenticator_subkey = kdc_exchange_dict['armor_subkey'] else: tgt = kdc_exchange_dict['tgt'] authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] if req_body is not None: body_checksum_type = kdc_exchange_dict['body_checksum_type'] req_body_blob = self.der_encode( req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) req_body_checksum = self.Checksum_create( tgt.session_key, KU_TGS_REQ_AUTH_CKSUM, req_body_blob, ctype=body_checksum_type) auth_data = kdc_exchange_dict['auth_data'] subkey_obj = None if authenticator_subkey is not None: subkey_obj = authenticator_subkey.export_obj() if seq_number is None: seq_number = random.randint(0, 0xfffffffe) (ctime, cusec) = self.get_KerberosTimeWithUsec() authenticator_obj = self.Authenticator_create( crealm=tgt.crealm, cname=tgt.cname, cksum=req_body_checksum, cusec=cusec, ctime=ctime, subkey=subkey_obj, seq_number=seq_number, authorization_data=auth_data) authenticator_blob = self.der_encode( authenticator_obj, asn1Spec=krb5_asn1.Authenticator()) if usage is None: usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH authenticator = self.EncryptedData_create(tgt.session_key, usage, authenticator_blob) if armor: ap_options = kdc_exchange_dict['fast_ap_options'] else: ap_options = kdc_exchange_dict['ap_options'] if ap_options is None: ap_options = str(krb5_asn1.APOptions('0')) ap_req_obj = self.AP_REQ_create(ap_options=ap_options, ticket=tgt.ticket, authenticator=authenticator) ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ()) return ap_req def generate_simple_tgs_padata(self, kdc_exchange_dict, callback_dict, req_body): ap_req = self.generate_ap_req(kdc_exchange_dict, callback_dict, req_body, armor=False) pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req) padata = [pa_tgs_req] return padata, req_body def get_preauth_key(self, kdc_exchange_dict): msg_type = kdc_exchange_dict['rep_msg_type'] if msg_type == KRB_AS_REP: key = kdc_exchange_dict['preauth_key'] usage = KU_AS_REP_ENC_PART else: # KRB_TGS_REP authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] if authenticator_subkey is not None: key = authenticator_subkey usage = KU_TGS_REP_ENC_PART_SUB_KEY else: tgt = kdc_exchange_dict['tgt'] key = tgt.session_key usage = KU_TGS_REP_ENC_PART_SESSION self.assertIsNotNone(key) return key, usage def generate_armor_key(self, subkey, session_key): armor_key = kcrypto.cf2(subkey.key, session_key.key, b'subkeyarmor', b'ticketarmor') armor_key = Krb5EncryptionKey(armor_key, None) return armor_key def generate_strengthen_reply_key(self, strengthen_key, reply_key): strengthen_reply_key = kcrypto.cf2(strengthen_key.key, reply_key.key, b'strengthenkey', b'replykey') strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key, reply_key.kvno) return strengthen_reply_key def generate_client_challenge_key(self, armor_key, longterm_key): client_challenge_key = kcrypto.cf2(armor_key.key, longterm_key.key, b'clientchallengearmor', b'challengelongterm') client_challenge_key = Krb5EncryptionKey(client_challenge_key, None) return client_challenge_key def generate_kdc_challenge_key(self, armor_key, longterm_key): kdc_challenge_key = kcrypto.cf2(armor_key.key, longterm_key.key, b'kdcchallengearmor', b'challengelongterm') kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None) return kdc_challenge_key def verify_ticket_checksum(self, ticket, expected_checksum, armor_key): expected_type = expected_checksum['cksumtype'] self.assertEqual(armor_key.ctype, expected_type) ticket_blob = self.der_encode(ticket, asn1Spec=krb5_asn1.Ticket()) checksum = self.Checksum_create(armor_key, KU_FAST_FINISHED, ticket_blob) self.assertEqual(expected_checksum, checksum) def verify_ticket(self, ticket, krbtgt_keys, service_ticket, expect_pac=True, expect_ticket_checksum=True, expect_full_checksum=None): # Decrypt the ticket. key = ticket.decryption_key enc_part = ticket.ticket['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncTicketPart()) # Fetch the authorization data from the ticket. auth_data = enc_part.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) elif auth_data is None: return # Get a copy of the authdata with an empty PAC, and the existing PAC # (if present). empty_pac = self.get_empty_pac() auth_data, pac_data = self.replace_pac(auth_data, empty_pac, expect_pac=expect_pac) if not expect_pac: return # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the # raw type to create a new PAC with zeroed signatures for # verification. This is because on Windows, the resource_groups field # is added to PAC_LOGON_INFO after the info3 field has been created, # which results in a different ordering of pointer values than Samba # (see commit 0e201ecdc53). Using the raw type avoids changing # PAC_LOGON_INFO, so verification against Windows can work. We still # need the PAC_DATA type to retrieve the actual checksums, because the # signatures in the raw type may contain padding bytes. pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW, pac_data) checksums = {} full_checksum_buffer = None for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers): buffer_type = pac_buffer.type if buffer_type in self.pac_checksum_types: self.assertNotIn(buffer_type, checksums, f'Duplicate checksum type {buffer_type}') # Fetch the checksum and the checksum type from the PAC buffer. checksum = pac_buffer.info.signature ctype = pac_buffer.info.type if ctype & 1 << 31: ctype |= -1 << 31 checksums[buffer_type] = checksum, ctype if buffer_type == krb5pac.PAC_TYPE_FULL_CHECKSUM: full_checksum_buffer = raw_pac_buffer elif buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM: # Zero the checksum field so that we can later verify the # checksums. The ticket checksum field is not zeroed. signature = ndr_unpack( krb5pac.PAC_SIGNATURE_DATA, raw_pac_buffer.info.remaining) signature.signature = bytes(len(checksum)) raw_pac_buffer.info.remaining = ndr_pack( signature) # Re-encode the PAC. pac_data = ndr_pack(raw_pac) if full_checksum_buffer is not None: signature = ndr_unpack( krb5pac.PAC_SIGNATURE_DATA, full_checksum_buffer.info.remaining) signature.signature = bytes(len(checksum)) full_checksum_buffer.info.remaining = ndr_pack( signature) # Re-encode the PAC. full_pac_data = ndr_pack(raw_pac) # Verify the signatures. server_checksum, server_ctype = checksums[ krb5pac.PAC_TYPE_SRV_CHECKSUM] key.verify_checksum(KU_NON_KERB_CKSUM_SALT, pac_data, server_ctype, server_checksum) kdc_checksum, kdc_ctype = checksums[ krb5pac.PAC_TYPE_KDC_CHECKSUM] if isinstance(krbtgt_keys, collections.abc.Container): if self.strict_checking: krbtgt_key = krbtgt_keys[0] else: krbtgt_key = next(key for key in krbtgt_keys if key.ctype == kdc_ctype) else: krbtgt_key = krbtgt_keys krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, server_checksum, kdc_ctype, kdc_checksum) if not service_ticket: self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums) self.assertNotIn(krb5pac.PAC_TYPE_FULL_CHECKSUM, checksums) else: ticket_checksum, ticket_ctype = checksums.get( krb5pac.PAC_TYPE_TICKET_CHECKSUM, (None, None)) if expect_ticket_checksum: self.assertIsNotNone(ticket_checksum) elif expect_ticket_checksum is False: self.assertIsNone(ticket_checksum) if ticket_checksum is not None: enc_part['authorization-data'] = auth_data enc_part = self.der_encode(enc_part, asn1Spec=krb5_asn1.EncTicketPart()) krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, enc_part, ticket_ctype, ticket_checksum) full_checksum, full_ctype = checksums.get( krb5pac.PAC_TYPE_FULL_CHECKSUM, (None, None)) if expect_full_checksum: self.assertIsNotNone(full_checksum) elif expect_full_checksum is False: self.assertIsNone(full_checksum) if full_checksum is not None: krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT, full_pac_data, full_ctype, full_checksum) def modified_ticket(self, ticket, *, new_ticket_key=None, modify_fn=None, modify_pac_fn=None, exclude_pac=False, allow_empty_authdata=False, update_pac_checksums=True, checksum_keys=None, include_checksums=None): if checksum_keys is None: # A dict containing a key for each checksum type to be created in # the PAC. checksum_keys = {} if include_checksums is None: # A dict containing a value for each checksum type; True if the # checksum type is to be included in the PAC, False if it is to be # excluded, or None/not present if the checksum is to be included # based on its presence in the original PAC. include_checksums = {} # Check that the values passed in by the caller make sense. self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types) self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types) if exclude_pac: self.assertIsNone(modify_pac_fn) update_pac_checksums = False if not update_pac_checksums: self.assertFalse(checksum_keys) self.assertFalse(include_checksums) expect_pac = modify_pac_fn is not None key = ticket.decryption_key if new_ticket_key is None: # Use the same key to re-encrypt the ticket. new_ticket_key = key if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys: # If the server signature key is not present, fall back to the key # used to encrypt the ticket. checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys: # If the ticket signature key is not present, fall back to the key # used for the KDC signature. kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_key is not None: checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = ( kdc_checksum_key) if krb5pac.PAC_TYPE_FULL_CHECKSUM not in checksum_keys: # If the full signature key is not present, fall back to the key # used for the KDC signature. kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_key is not None: checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] = ( kdc_checksum_key) # Decrypt the ticket. enc_part = ticket.ticket['enc-part'] self.assertElementEqual(enc_part, 'etype', key.etype) self.assertElementKVNO(enc_part, 'kvno', key.kvno) enc_part = key.decrypt(KU_TICKET, enc_part['cipher']) enc_part = self.der_decode( enc_part, asn1Spec=krb5_asn1.EncTicketPart()) # Modify the ticket here. if modify_fn is not None: enc_part = modify_fn(enc_part) auth_data = enc_part.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) if auth_data is not None: new_pac = None if not exclude_pac: # Get a copy of the authdata with an empty PAC, and the # existing PAC (if present). empty_pac = self.get_empty_pac() empty_pac_auth_data, pac_data = self.replace_pac( auth_data, empty_pac, expect_pac=expect_pac) if pac_data is not None: pac = ndr_unpack(krb5pac.PAC_DATA, pac_data) # Modify the PAC here. if modify_pac_fn is not None: pac = modify_pac_fn(pac) if update_pac_checksums: # Get the enc-part with an empty PAC, which is needed # to create a ticket signature. enc_part_to_sign = enc_part.copy() enc_part_to_sign['authorization-data'] = ( empty_pac_auth_data) enc_part_to_sign = self.der_encode( enc_part_to_sign, asn1Spec=krb5_asn1.EncTicketPart()) self.update_pac_checksums(pac, checksum_keys, include_checksums, enc_part_to_sign) # Re-encode the PAC. pac_data = ndr_pack(pac) new_pac = self.AuthorizationData_create(AD_WIN2K_PAC, pac_data) # Replace the PAC in the authorization data and re-add it to the # ticket enc-part. auth_data, _ = self.replace_pac( auth_data, new_pac, expect_pac=expect_pac, allow_empty_authdata=allow_empty_authdata) enc_part['authorization-data'] = auth_data # Re-encrypt the ticket enc-part with the new key. enc_part_new = self.der_encode(enc_part, asn1Spec=krb5_asn1.EncTicketPart()) enc_part_new = self.EncryptedData_create(new_ticket_key, KU_TICKET, enc_part_new) # Create a copy of the ticket with the new enc-part. new_ticket = ticket.ticket.copy() new_ticket['enc-part'] = enc_part_new new_ticket_creds = KerberosTicketCreds( new_ticket, session_key=ticket.session_key, crealm=ticket.crealm, cname=ticket.cname, srealm=ticket.srealm, sname=ticket.sname, decryption_key=new_ticket_key, ticket_private=enc_part, encpart_private=ticket.encpart_private) return new_ticket_creds def update_pac_checksums(self, pac, checksum_keys, include_checksums, enc_part=None): pac_buffers = pac.buffers checksum_buffers = {} # Find the relevant PAC checksum buffers. for pac_buffer in pac_buffers: buffer_type = pac_buffer.type if buffer_type in self.pac_checksum_types: self.assertNotIn(buffer_type, checksum_buffers, f'Duplicate checksum type {buffer_type}') checksum_buffers[buffer_type] = pac_buffer # Create any additional buffers that were requested but not # present. Conversely, remove any buffers that were requested to be # removed. for buffer_type in self.pac_checksum_types: if buffer_type in checksum_buffers: if include_checksums.get(buffer_type) is False: checksum_buffer = checksum_buffers.pop(buffer_type) pac.num_buffers -= 1 pac_buffers.remove(checksum_buffer) elif include_checksums.get(buffer_type) is True: info = krb5pac.PAC_SIGNATURE_DATA() checksum_buffer = krb5pac.PAC_BUFFER() checksum_buffer.type = buffer_type checksum_buffer.info = info pac_buffers.append(checksum_buffer) pac.num_buffers += 1 checksum_buffers[buffer_type] = checksum_buffer # Fill the relevant checksum buffers. for buffer_type, checksum_buffer in checksum_buffers.items(): checksum_key = checksum_keys[buffer_type] ctype = checksum_key.ctype & ((1 << 32) - 1) if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM: self.assertIsNotNone(enc_part) signature = checksum_key.make_rodc_checksum( KU_NON_KERB_CKSUM_SALT, enc_part) elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM: signature = checksum_key.make_zeroed_checksum() else: signature = checksum_key.make_rodc_zeroed_checksum() checksum_buffer.info.signature = signature checksum_buffer.info.type = ctype # Add the new checksum buffers to the PAC. pac.buffers = pac_buffers # Calculate the full checksum and insert it into the PAC. full_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_FULL_CHECKSUM) if full_checksum_buffer is not None: full_checksum_key = checksum_keys[krb5pac.PAC_TYPE_FULL_CHECKSUM] pac_data = ndr_pack(pac) full_checksum = full_checksum_key.make_checksum( KU_NON_KERB_CKSUM_SALT, pac_data) full_checksum_buffer.info.signature = full_checksum # Calculate the server and KDC checksums and insert them into the PAC. server_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_SRV_CHECKSUM) if server_checksum_buffer is not None: server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] pac_data = ndr_pack(pac) server_checksum = server_checksum_key.make_checksum( KU_NON_KERB_CKSUM_SALT, pac_data) server_checksum_buffer.info.signature = server_checksum kdc_checksum_buffer = checksum_buffers.get( krb5pac.PAC_TYPE_KDC_CHECKSUM) if kdc_checksum_buffer is not None: if server_checksum_buffer is None: # There's no server signature to make the checksum over, so # just make the checksum over an empty bytes object. server_checksum = bytes() kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM] kdc_checksum = kdc_checksum_key.make_rodc_checksum( KU_NON_KERB_CKSUM_SALT, server_checksum) kdc_checksum_buffer.info.signature = kdc_checksum def replace_pac(self, auth_data, new_pac, expect_pac=True, allow_empty_authdata=False): if new_pac is not None: self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC) self.assertElementPresent(new_pac, 'ad-data') new_auth_data = [] ad_relevant = None old_pac = None for authdata_elem in auth_data: if authdata_elem['ad-type'] == AD_IF_RELEVANT: ad_relevant = self.der_decode( authdata_elem['ad-data'], asn1Spec=krb5_asn1.AD_IF_RELEVANT()) relevant_elems = [] for relevant_elem in ad_relevant: if relevant_elem['ad-type'] == AD_WIN2K_PAC: self.assertIsNone(old_pac, 'Multiple PACs detected') old_pac = relevant_elem['ad-data'] if new_pac is not None: relevant_elems.append(new_pac) else: relevant_elems.append(relevant_elem) if expect_pac: self.assertIsNotNone(old_pac, 'Expected PAC') if relevant_elems or allow_empty_authdata: ad_relevant = self.der_encode( relevant_elems, asn1Spec=krb5_asn1.AD_IF_RELEVANT()) authdata_elem = self.AuthorizationData_create( AD_IF_RELEVANT, ad_relevant) else: authdata_elem = None if authdata_elem is not None or allow_empty_authdata: new_auth_data.append(authdata_elem) if expect_pac: self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT') return new_auth_data, old_pac def get_pac(self, auth_data, expect_pac=True): _, pac = self.replace_pac(auth_data, None, expect_pac) return pac def get_ticket_pac(self, ticket, expect_pac=True): auth_data = ticket.ticket_private.get('authorization-data') if expect_pac: self.assertIsNotNone(auth_data) elif auth_data is None: return None return self.get_pac(auth_data, expect_pac=expect_pac) def get_krbtgt_checksum_key(self): krbtgt_creds = self.get_krbtgt_creds() krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds) return { krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key } def is_tgs_principal(self, principal): if self.is_tgs(principal): return True if self.kadmin_is_tgs and self.is_kadmin(principal): return True return False def is_kadmin(self, principal): name = principal['name-string'][0] return name in ('kadmin', b'kadmin') def is_tgs(self, principal): name = principal['name-string'][0] return name in ('krbtgt', b'krbtgt') def is_tgt(self, ticket): sname = ticket.ticket['sname'] return self.is_tgs(sname) def get_empty_pac(self): return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1)) def get_outer_pa_dict(self, kdc_exchange_dict): return self.get_pa_dict(kdc_exchange_dict['req_padata']) def get_fast_pa_dict(self, kdc_exchange_dict): req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata']) if req_pa_dict: return req_pa_dict return self.get_outer_pa_dict(kdc_exchange_dict) def sent_fast(self, kdc_exchange_dict): outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict) return PADATA_FX_FAST in outer_pa_dict def sent_enc_challenge(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict def sent_enc_pa_rep(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) return PADATA_REQ_ENC_PA_REP in fast_pa_dict def get_sent_pac_options(self, kdc_exchange_dict): fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict) if PADATA_PAC_OPTIONS not in fast_pa_dict: return '' pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS], asn1Spec=krb5_asn1.PA_PAC_OPTIONS()) pac_options = pac_options['options'] # Mask out unsupported bits. pac_options, remaining = pac_options[:4], pac_options[4:] pac_options += '0' * len(remaining) return pac_options def get_krbtgt_sname(self): krbtgt_creds = self.get_krbtgt_creds() krbtgt_username = krbtgt_creds.get_username() krbtgt_realm = krbtgt_creds.get_realm() krbtgt_sname = self.PrincipalName_create( name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm]) return krbtgt_sname def _test_as_exchange(self, cname, realm, sname, till, client_as_etypes, expected_error_mode, expected_crealm, expected_cname, expected_srealm, expected_sname, expected_salt, etypes, padata, kdc_options, renew_time=None, expected_account_name=None, expected_groups=None, unexpected_groups=None, expected_upn_name=None, expected_sid=None, expected_domain_sid=None, expected_flags=None, unexpected_flags=None, expected_supported_etypes=None, preauth_key=None, ticket_decryption_key=None, pac_request=None, pac_options=None, expect_pac=True, expect_pac_attrs=None, expect_pac_attrs_pac_request=None, expect_requester_sid=None, expect_client_claims=None, expect_device_claims=None, expected_client_claims=None, unexpected_client_claims=None, expected_device_claims=None, unexpected_device_claims=None, expect_edata=None, rc4_support=True, to_rodc=False): def _generate_padata_copy(_kdc_exchange_dict, _callback_dict, req_body): return padata, req_body if not expected_error_mode: check_error_fn = None check_rep_fn = self.generic_check_kdc_rep else: check_error_fn = self.generic_check_kdc_error check_rep_fn = None if padata is not None: generate_padata_fn = _generate_padata_copy else: generate_padata_fn = None kdc_exchange_dict = self.as_exchange_dict( expected_crealm=expected_crealm, expected_cname=expected_cname, expected_srealm=expected_srealm, expected_sname=expected_sname, expected_account_name=expected_account_name, expected_groups=expected_groups, unexpected_groups=unexpected_groups, expected_upn_name=expected_upn_name, expected_sid=expected_sid, expected_domain_sid=expected_domain_sid, expected_supported_etypes=expected_supported_etypes, ticket_decryption_key=ticket_decryption_key, generate_padata_fn=generate_padata_fn, check_error_fn=check_error_fn, check_rep_fn=check_rep_fn, check_kdc_private_fn=self.generic_check_kdc_private, expected_error_mode=expected_error_mode, client_as_etypes=client_as_etypes, expected_salt=expected_salt, expected_flags=expected_flags, unexpected_flags=unexpected_flags, preauth_key=preauth_key, kdc_options=str(kdc_options), pac_request=pac_request, pac_options=pac_options, expect_pac=expect_pac, expect_pac_attrs=expect_pac_attrs, expect_pac_attrs_pac_request=expect_pac_attrs_pac_request, expect_requester_sid=expect_requester_sid, expect_client_claims=expect_client_claims, expect_device_claims=expect_device_claims, expected_client_claims=expected_client_claims, unexpected_client_claims=unexpected_client_claims, expected_device_claims=expected_device_claims, unexpected_device_claims=unexpected_device_claims, expect_edata=expect_edata, rc4_support=rc4_support, to_rodc=to_rodc) rep = self._generic_kdc_exchange(kdc_exchange_dict, cname=cname, realm=realm, sname=sname, till_time=till, renew_time=renew_time, etypes=etypes) return rep, kdc_exchange_dict