summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/samba/tests/krb5/raw_testcase.py44
1 files changed, 27 insertions, 17 deletions
diff --git a/python/samba/tests/krb5/raw_testcase.py b/python/samba/tests/krb5/raw_testcase.py
index 421143781ae..2aed5530455 100644
--- a/python/samba/tests/krb5/raw_testcase.py
+++ b/python/samba/tests/krb5/raw_testcase.py
@@ -920,24 +920,28 @@ class RawKerberosTest(TestCaseInTempDir):
return blob
def send_pdu(self, req, asn1_print=None, hexdump=None):
+ k5_pdu = self.der_encode(
+ req, native_decode=False, asn1_print=asn1_print, hexdump=False)
+ self.send_msg(k5_pdu, hexdump=hexdump)
+
+ def send_msg(self, msg, hexdump=None):
+ header = struct.pack('>I', len(msg))
+ req_pdu = header
+ req_pdu += msg
+ self.hex_dump("send_msg", header, hexdump=hexdump)
+ self.hex_dump("send_msg", msg, hexdump=hexdump)
+
try:
- k5_pdu = self.der_encode(
- req, native_decode=False, asn1_print=asn1_print, hexdump=False)
- header = struct.pack('>I', len(k5_pdu))
- req_pdu = header
- req_pdu += k5_pdu
- self.hex_dump("send_pdu", header, hexdump=hexdump)
- self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
while True:
sent = self.s.send(req_pdu, 0)
if sent == len(req_pdu):
- break
+ return
req_pdu = req_pdu[sent:]
except socket.error as e:
- self._disconnect("send_pdu: %s" % e)
+ self._disconnect("send_msg: %s" % e)
raise
except IOError as e:
- self._disconnect("send_pdu: %s" % e)
+ self._disconnect("send_msg: %s" % e)
raise
def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
@@ -963,16 +967,14 @@ class RawKerberosTest(TestCaseInTempDir):
return rep_pdu
def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
- rep_pdu = None
- rep = None
raw_pdu = self.recv_raw(
num_recv=4, hexdump=hexdump, timeout=timeout)
if raw_pdu is None:
- return (None, None)
+ return None
header = struct.unpack(">I", raw_pdu[0:4])
k5_len = header[0]
if k5_len == 0:
- return (None, "")
+ return ""
missing = k5_len
rep_pdu = b''
while missing > 0:
@@ -981,6 +983,14 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertGreaterEqual(len(raw_pdu), 1)
rep_pdu += raw_pdu
missing = k5_len - len(rep_pdu)
+ return rep_pdu
+
+ def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
+ rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
+ hexdump=hexdump,
+ timeout=timeout)
+ if not rep_pdu:
+ return None, rep_pdu
k5_raw = self.der_decode(
rep_pdu,
asn1Spec=None,
@@ -1002,9 +1012,9 @@ class RawKerberosTest(TestCaseInTempDir):
return (rep, rep_pdu)
def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
- (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
- hexdump=hexdump,
- timeout=timeout)
+ (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
+ hexdump=hexdump,
+ timeout=timeout)
return rep
def assertIsConnected(self):