From 668825ad56ff70715c626bc3209a6868409e4969 Mon Sep 17 00:00:00 2001 From: Joseph Sutton Date: Tue, 24 May 2022 19:57:57 +1200 Subject: CVE-2022-2031 tests/krb5: Add kpasswd_exchange() method Now we can test the kpasswd service from Python. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074 Signed-off-by: Joseph Sutton Reviewed-by: Andreas Schneider [jsutton@samba.org Fixed conflicts in imports] --- python/samba/tests/krb5/raw_testcase.py | 264 ++++++++++++++++++++++++++++++-- 1 file changed, 251 insertions(+), 13 deletions(-) diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 2aed5530455..57010ae73bd 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -26,6 +26,8 @@ import binascii import itertools import collections +from enum import Enum + from pyasn1.codec.der.decoder import decode as pyasn1_der_decode from pyasn1.codec.der.encoder import encode as pyasn1_der_encode from pyasn1.codec.native.decoder import decode as pyasn1_native_decode @@ -33,6 +35,8 @@ from pyasn1.codec.native.encoder import encode as pyasn1_native_encode from pyasn1.codec.ber.encoder import BitStringEncoder +from pyasn1.error import PyAsn1Error + from samba.credentials import Credentials from samba.dcerpc import krb5pac, security from samba.gensec import FEATURE_SEAL @@ -50,6 +54,7 @@ from samba.tests.krb5.rfc4120_constants import ( KDC_ERR_PREAUTH_FAILED, KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS, KERB_ERR_TYPE_EXTENDED, + KRB_AP_REP, KRB_AP_REQ, KRB_AS_REP, KRB_AS_REQ, @@ -59,6 +64,7 @@ from samba.tests.krb5.rfc4120_constants import ( KRB_TGS_REQ, KU_AP_REQ_AUTH, KU_AS_REP_ENC_PART, + KU_AP_REQ_ENC_PART, KU_ENC_CHALLENGE_KDC, KU_FAST_ENC, KU_FAST_FINISHED, @@ -73,6 +79,7 @@ from samba.tests.krb5.rfc4120_constants import ( KU_TGS_REQ_AUTH_DAT_SESSION, KU_TGS_REQ_AUTH_DAT_SUBKEY, KU_TICKET, + NT_PRINCIPAL, NT_SRV_INST, NT_WELLKNOWN, PADATA_ENCRYPTED_CHALLENGE, @@ -515,6 +522,10 @@ class KerberosTicketCreds: class RawKerberosTest(TestCaseInTempDir): """A raw Kerberos Test case.""" + class KpasswdMode(Enum): + SET = object() + CHANGE = object() + pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM, krb5pac.PAC_TYPE_KDC_CHECKSUM, krb5pac.PAC_TYPE_TICKET_CHECKSUM} @@ -1886,6 +1897,224 @@ class RawKerberosTest(TestCaseInTempDir): return msg + def get_enc_part(self, obj, key, usage): + self.assertElementEqual(obj, 'pvno', 5) + + enc_part = obj['enc-part'] + self.assertElementEqual(enc_part, 'etype', key.etype) + self.assertElementKVNO(enc_part, 'kvno', key.kvno) + + enc_part = key.decrypt(usage, enc_part['cipher']) + + return enc_part + + def kpasswd_exchange(self, + ticket, + new_password, + expected_code, + expected_msg, + mode, + target_princ=None, + target_realm=None, + ap_options=None, + send_seq_number=True): + if mode is self.KpasswdMode.SET: + version = 0xff80 + user_data = self.ChangePasswdDataMS_create(new_password, + target_princ, + target_realm) + elif mode is self.KpasswdMode.CHANGE: + self.assertIsNone(target_princ, + 'target_princ only valid for pw set') + self.assertIsNone(target_realm, + 'target_realm only valid for pw set') + + version = 1 + user_data = new_password.encode('utf-8') + else: + self.fail(f'invalid mode {mode}') + + subkey = self.RandomKey(kcrypto.Enctype.AES256) + + if ap_options is None: + ap_options = '0' + ap_options = str(krb5_asn1.APOptions(ap_options)) + + kdc_exchange_dict = { + 'tgt': ticket, + 'authenticator_subkey': subkey, + 'auth_data': None, + 'ap_options': ap_options, + } + + if send_seq_number: + seq_number = random.randint(0, 0xfffffffe) + else: + seq_number = None + + ap_req = self.generate_ap_req(kdc_exchange_dict, + None, + req_body=None, + armor=False, + usage=KU_AP_REQ_AUTH, + seq_number=seq_number) + + self.connect(self.host, port=464) + self.assertIsNotNone(self.s) + + family = self.s.family + + if family == socket.AF_INET: + addr_type = 2 # IPv4 + elif family == socket.AF_INET6: + addr_type = 24 # IPv6 + else: + self.fail(f'unknown family {family}') + + def create_address(ip): + return { + 'addr-type': addr_type, + 'address': socket.inet_pton(family, ip), + } + + local_ip = self.s.getsockname()[0] + local_address = create_address(local_ip) + + # remote_ip = self.s.getpeername()[0] + # remote_address = create_address(remote_ip) + + # TODO: due to a bug (?), MIT Kerberos will not accept the request + # unless r-address is set to our _local_ address. Heimdal, on the other + # hand, requires the r-address is set to the remote address (as + # expected). To avoid problems, avoid sending r-address for now. + remote_address = None + + msg = self.kpasswd_create(subkey, + user_data, + version, + seq_number, + ap_req, + local_address, + remote_address) + + self.send_msg(msg) + rep_pdu = self.recv_pdu_raw() + + self._disconnect('transaction done') + + self.assertIsNotNone(rep_pdu) + + header = rep_pdu[:6] + reply = rep_pdu[6:] + + reply_len = (header[0] << 8) | header[1] + reply_version = (header[2] << 8) | header[3] + ap_rep_len = (header[4] << 8) | header[5] + + self.assertEqual(reply_len, len(rep_pdu)) + self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW + self.assertLess(ap_rep_len, reply_len) + + self.assertNotEqual(0x7e, rep_pdu[1]) + self.assertNotEqual(0x5e, rep_pdu[1]) + + if ap_rep_len: + # We received an AP-REQ and KRB-PRIV as a response. This may or may + # not indicate an error, depending on the status code. + ap_rep = reply[:ap_rep_len] + krb_priv = reply[ap_rep_len:] + + key = ticket.session_key + + ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP()) + self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP) + enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART) + enc_part = self.der_decode( + enc_part, asn1Spec=krb5_asn1.EncAPRepPart()) + + self.assertElementPresent(enc_part, 'ctime') + self.assertElementPresent(enc_part, 'cusec') + # self.assertElementMissing(enc_part, 'subkey') # TODO + # self.assertElementPresent(enc_part, 'seq-number') # TODO + + try: + krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV()) + except PyAsn1Error: + self.fail() + + self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV) + priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV) + priv_enc_part = self.der_decode( + priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart()) + + self.assertElementMissing(priv_enc_part, 'timestamp') + self.assertElementMissing(priv_enc_part, 'usec') + # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO + # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO + # self.assertElementMissing(priv_enc_part, 'r-address') # TODO + + result_data = priv_enc_part['user-data'] + else: + # We received a KRB-ERROR as a response, indicating an error. + krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR()) + + sname = self.PrincipalName_create( + name_type=NT_PRINCIPAL, + names=['kadmin', 'changepw']) + realm = self.get_krbtgt_creds().get_realm().upper() + + self.assertElementEqual(krb_error, 'pvno', 5) + self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR) + self.assertElementMissing(krb_error, 'ctime') + self.assertElementMissing(krb_error, 'usec') + self.assertElementPresent(krb_error, 'stime') + self.assertElementPresent(krb_error, 'susec') + + error_code = krb_error['error-code'] + if isinstance(expected_code, int): + self.assertEqual(error_code, expected_code) + else: + self.assertIn(error_code, expected_code) + + self.assertElementMissing(krb_error, 'crealm') + self.assertElementMissing(krb_error, 'cname') + self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8')) + self.assertElementEqualPrincipal(krb_error, 'sname', sname) + self.assertElementMissing(krb_error, 'e-text') + + result_data = krb_error['e-data'] + + status = result_data[:2] + message = result_data[2:] + + status_code = (status[0] << 8) | status[1] + if isinstance(expected_code, int): + self.assertEqual(status_code, expected_code) + else: + self.assertIn(status_code, expected_code) + + if not message: + self.assertEqual(0, status_code, + 'got an error result, but no message') + return + + # Check the first character of the message. + if message[0]: + if isinstance(expected_msg, bytes): + self.assertEqual(message, expected_msg) + else: + self.assertIn(message, expected_msg) + else: + # We got AD password policy information. + self.assertEqual(30, len(message)) + + (empty_bytes, + min_length, + history_length, + properties, + expire_time, + min_age) = struct.unpack('>HIIIQQ', message) + def _generic_kdc_exchange(self, kdc_exchange_dict, # required cname=None, # optional @@ -1996,7 +2225,7 @@ class RawKerberosTest(TestCaseInTempDir): self.assertIsNotNone(generate_fast_fn) fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict, callback_dict, - req_body, + None, armor=True) fast_armor_type = kdc_exchange_dict['fast_armor_type'] @@ -3211,31 +3440,39 @@ class RawKerberosTest(TestCaseInTempDir): kdc_exchange_dict, _callback_dict, req_body, - armor): + armor, + usage=None, + seq_number=None): + req_body_checksum = None + if armor: + self.assertIsNone(req_body) + tgt = kdc_exchange_dict['armor_tgt'] authenticator_subkey = kdc_exchange_dict['armor_subkey'] - - req_body_checksum = None else: tgt = kdc_exchange_dict['tgt'] authenticator_subkey = kdc_exchange_dict['authenticator_subkey'] - body_checksum_type = kdc_exchange_dict['body_checksum_type'] - req_body_blob = self.der_encode(req_body, - asn1Spec=krb5_asn1.KDC_REQ_BODY()) + if req_body is not None: + body_checksum_type = kdc_exchange_dict['body_checksum_type'] + + req_body_blob = self.der_encode( + req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY()) - req_body_checksum = self.Checksum_create(tgt.session_key, - KU_TGS_REQ_AUTH_CKSUM, - req_body_blob, - ctype=body_checksum_type) + req_body_checksum = self.Checksum_create( + tgt.session_key, + KU_TGS_REQ_AUTH_CKSUM, + req_body_blob, + ctype=body_checksum_type) auth_data = kdc_exchange_dict['auth_data'] subkey_obj = None if authenticator_subkey is not None: subkey_obj = authenticator_subkey.export_obj() - seq_number = random.randint(0, 0xfffffffe) + if seq_number is None: + seq_number = random.randint(0, 0xfffffffe) (ctime, cusec) = self.get_KerberosTimeWithUsec() authenticator_obj = self.Authenticator_create( crealm=tgt.crealm, @@ -3250,7 +3487,8 @@ class RawKerberosTest(TestCaseInTempDir): authenticator_obj, asn1Spec=krb5_asn1.Authenticator()) - usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH + if usage is None: + usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH authenticator = self.EncryptedData_create(tgt.session_key, usage, authenticator_blob) -- cgit v1.2.1