diff options
Diffstat (limited to 'python/samba/tests')
-rw-r--r-- | python/samba/tests/krb5/raw_testcase.py | 44 |
1 files changed, 27 insertions, 17 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py index 421143781ae..2aed5530455 100644 --- a/python/samba/tests/krb5/raw_testcase.py +++ b/python/samba/tests/krb5/raw_testcase.py @@ -920,24 +920,28 @@ class RawKerberosTest(TestCaseInTempDir): return blob def send_pdu(self, req, asn1_print=None, hexdump=None): + k5_pdu = self.der_encode( + req, native_decode=False, asn1_print=asn1_print, hexdump=False) + self.send_msg(k5_pdu, hexdump=hexdump) + + def send_msg(self, msg, hexdump=None): + header = struct.pack('>I', len(msg)) + req_pdu = header + req_pdu += msg + self.hex_dump("send_msg", header, hexdump=hexdump) + self.hex_dump("send_msg", msg, hexdump=hexdump) + try: - k5_pdu = self.der_encode( - req, native_decode=False, asn1_print=asn1_print, hexdump=False) - header = struct.pack('>I', len(k5_pdu)) - req_pdu = header - req_pdu += k5_pdu - self.hex_dump("send_pdu", header, hexdump=hexdump) - self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump) while True: sent = self.s.send(req_pdu, 0) if sent == len(req_pdu): - break + return req_pdu = req_pdu[sent:] except socket.error as e: - self._disconnect("send_pdu: %s" % e) + self._disconnect("send_msg: %s" % e) raise except IOError as e: - self._disconnect("send_pdu: %s" % e) + self._disconnect("send_msg: %s" % e) raise def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None): @@ -963,16 +967,14 @@ class RawKerberosTest(TestCaseInTempDir): return rep_pdu def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None): - rep_pdu = None - rep = None raw_pdu = self.recv_raw( num_recv=4, hexdump=hexdump, timeout=timeout) if raw_pdu is None: - return (None, None) + return None header = struct.unpack(">I", raw_pdu[0:4]) k5_len = header[0] if k5_len == 0: - return (None, "") + return "" missing = k5_len rep_pdu = b'' while missing > 0: @@ -981,6 +983,14 @@ class RawKerberosTest(TestCaseInTempDir): self.assertGreaterEqual(len(raw_pdu), 1) rep_pdu += raw_pdu missing = k5_len - len(rep_pdu) + return rep_pdu + + def recv_reply(self, asn1_print=None, hexdump=None, timeout=None): + rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) + if not rep_pdu: + return None, rep_pdu k5_raw = self.der_decode( rep_pdu, asn1Spec=None, @@ -1002,9 +1012,9 @@ class RawKerberosTest(TestCaseInTempDir): return (rep, rep_pdu) def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None): - (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print, - hexdump=hexdump, - timeout=timeout) + (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print, + hexdump=hexdump, + timeout=timeout) return rep def assertIsConnected(self): |