From 13fe7e013eccca2c86258084f4443ddb7abaf089 Mon Sep 17 00:00:00 2001 From: Joseph Sutton Date: Tue, 24 May 2022 19:20:28 +1200 Subject: CVE-2022-2031 tests/krb5: Add methods to send and receive generic messages This allows us to send and receive kpasswd messages, while avoiding the existing logic for encoding and decoding other Kerberos message types. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049 BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074 Signed-off-by: Joseph Sutton Reviewed-by: Andreas Schneider --- python/samba/tests/krb5/raw_testcase.py | 44 ++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 421143781ae..2aed5530455 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -920,24 +920,28 @@ class RawKerberosTest(TestCaseInTempDir): return blob def send_pdu(self, req, asn1_print=None, hexdump=None): + k5_pdu = self.der_encode( + req, native_decode=False, asn1_print=asn1_print, hexdump=False) + self.send_msg(k5_pdu, hexdump=hexdump) + + def send_msg(self, msg, hexdump=None): + header = struct.pack('>I', len(msg)) + req_pdu = header + req_pdu += msg + self.hex_dump("send_msg", header, hexdump=hexdump) + self.hex_dump("send_msg", msg, hexdump=hexdump) + try: - k5_pdu = self.der_encode( - req, native_decode=False, asn1_print=asn1_print, hexdump=False) - header = struct.pack('>I', len(k5_pdu)) - req_pdu = header - req_pdu += k5_pdu - self.hex_dump("send_pdu", header, hexdump=hexdump) - self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump) while True: sent = self.s.send(req_pdu, 0) if sent == len(req_pdu): - break + return req_pdu = req_pdu[sent:] except socket.error as e: - self._disconnect("send_pdu: %s" % e) + self._disconnect("send_msg: %s" % e) raise except IOError as e: - self._disconnect("send_pdu: %s" % e) + self._disconnect("send_msg: %s" % e) raise def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None): @@ -963,16 +967,14 @@ class RawKerberosTest(TestCaseInTempDir): return rep_pdu def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None): - rep_pdu = None - rep = None raw_pdu = self.recv_raw( num_recv=4, hexdump=hexdump, timeout=timeout) if raw_pdu is None: - return (None, None) + return None header = struct.unpack(">I", raw_pdu[0:4]) k5_len = header[0] if k5_len == 0: - return (None, "") + return "" missing = k5_len rep_pdu = b'' while missing > 0: @@ -981,6 +983,14 @@ class RawKerberosTest(TestCaseInTempDir): self.assertGreaterEqual(len(raw_pdu), 1) rep_pdu += raw_pdu missing = k5_len - len(rep_pdu) + return rep_pdu + + def recv_reply(self, asn1_print=None, hexdump=None, timeout=None): + rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) + if not rep_pdu: + return None, rep_pdu k5_raw = self.der_decode( rep_pdu, asn1Spec=None, @@ -1002,9 +1012,9 @@ class RawKerberosTest(TestCaseInTempDir): return (rep, rep_pdu) def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None): - (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print, - hexdump=hexdump, - timeout=timeout) + (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) return rep def assertIsConnected(self): -- cgit v1.2.1