summaryrefslogtreecommitdiff
path: root/tests/python_dependencies/impacket/spnego.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/python_dependencies/impacket/spnego.py')
-rw-r--r--tests/python_dependencies/impacket/spnego.py372
1 files changed, 372 insertions, 0 deletions
diff --git a/tests/python_dependencies/impacket/spnego.py b/tests/python_dependencies/impacket/spnego.py
new file mode 100644
index 000000000..f177d18f7
--- /dev/null
+++ b/tests/python_dependencies/impacket/spnego.py
@@ -0,0 +1,372 @@
+# Copyright (c) 2003-2016 CORE Security Technologies
+#
+# This software is provided under under a slightly modified version
+# of the Apache Software License. See the accompanying LICENSE file
+# for more information.
+#
+# Author: Alberto Solino (beto@coresecurity.com)
+#
+# Description:
+# SPNEGO functions used by SMB, SMB2/3 and DCERPC
+#
+
+from struct import pack, unpack, calcsize
+
+############### GSS Stuff ################
+GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02'
+ASN1_SEQUENCE = 0x30
+ASN1_AID = 0x60
+ASN1_OID = 0x06
+ASN1_OCTET_STRING = 0x04
+ASN1_MECH_TYPE = 0xa0
+ASN1_MECH_TOKEN = 0xa2
+ASN1_SUPPORTED_MECH = 0xa1
+ASN1_RESPONSE_TOKEN = 0xa2
+ASN1_ENUMERATED = 0x0a
+MechTypes = {
+'+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30',
+'+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider',
+'*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5',
+'*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5',
+'*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User'
+}
+TypesMech = dict((v,k) for k, v in MechTypes.iteritems())
+
+def asn1encode(data = ''):
+ #res = asn1.SEQUENCE(str).encode()
+ #import binascii
+ #print '\nalex asn1encode str: %s\n' % binascii.hexlify(str)
+ if 0 <= len(data) <= 0x7F:
+ res = pack('B', len(data)) + data
+ elif 0x80 <= len(data) <= 0xFF:
+ res = pack('BB', 0x81, len(data)) + data
+ elif 0x100 <= len(data) <= 0xFFFF:
+ res = pack('!BH', 0x82, len(data)) + data
+ elif 0x10000 <= len(data) <= 0xffffff:
+ res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data
+ elif 0x1000000 <= len(data) <= 0xffffffff:
+ res = pack('!BL', 0x84, len(data)) + data
+ else:
+ raise Exception('Error in asn1encode')
+ return str(res)
+
+def asn1decode(data = ''):
+ len1 = unpack('B', data[:1])[0]
+ data = data[1:]
+ if len1 == 0x81:
+ pad = calcsize('B')
+ len2 = unpack('B',data[:pad])[0]
+ data = data[pad:]
+ ans = data[:len2]
+ elif len1 == 0x82:
+ pad = calcsize('H')
+ len2 = unpack('!H', data[:pad])[0]
+ data = data[pad:]
+ ans = data[:len2]
+ elif len1 == 0x83:
+ pad = calcsize('B') + calcsize('!H')
+ len2, len3 = unpack('!BH', data[:pad])
+ data = data[pad:]
+ ans = data[:len2 << 16 + len3]
+ elif len1 == 0x84:
+ pad = calcsize('!L')
+ len2 = unpack('!L', data[:pad])[0]
+ data = data[pad:]
+ ans = data[:len2]
+ # 1 byte length, string <= 0x7F
+ else:
+ pad = 0
+ ans = data[:len1]
+ return ans, len(ans)+pad+1
+
+class GSSAPI:
+# Generic GSSAPI Header Format
+ def __init__(self, data = None):
+ self.fields = {}
+ self['UUID'] = GSS_API_SPNEGO_UUID
+ if data:
+ self.fromString(data)
+ pass
+
+ def __setitem__(self,key,value):
+ self.fields[key] = value
+
+ def __getitem__(self, key):
+ return self.fields[key]
+
+ def __delitem__(self, key):
+ del self.fields[key]
+
+ def __len__(self):
+ return len(self.getData())
+
+ def __str__(self):
+ return len(self.getData())
+
+ def fromString(self, data = None):
+ # Manual parse of the GSSAPI Header Format
+ # It should be something like
+ # AID = 0x60 TAG, BER Length
+ # OID = 0x06 TAG
+ # GSSAPI OID
+ # UUID data (BER Encoded)
+ # Payload
+ next_byte = unpack('B',data[:1])[0]
+ if next_byte != ASN1_AID:
+ raise Exception('Unknown AID=%x' % next_byte)
+ data = data[1:]
+ decode_data, total_bytes = asn1decode(data)
+ # Now we should have a OID tag
+ next_byte = unpack('B',decode_data[:1])[0]
+ if next_byte != ASN1_OID:
+ raise Exception('OID tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ # Now the OID contents, should be SPNEGO UUID
+ uuid, total_bytes = asn1decode(decode_data)
+ self['OID'] = uuid
+ # the rest should be the data
+ self['Payload'] = decode_data[total_bytes:]
+ #pass
+
+ def dump(self):
+ for i in self.fields.keys():
+ print "%s: {%r}" % (i,self[i])
+
+ def getData(self):
+ ans = pack('B',ASN1_AID)
+ ans += asn1encode(
+ pack('B',ASN1_OID) +
+ asn1encode(self['UUID']) +
+ self['Payload'] )
+ return ans
+
+class SPNEGO_NegTokenResp:
+ # http://tools.ietf.org/html/rfc4178#page-9
+ # NegTokenResp ::= SEQUENCE {
+ # negState [0] ENUMERATED {
+ # accept-completed (0),
+ # accept-incomplete (1),
+ # reject (2),
+ # request-mic (3)
+ # } OPTIONAL,
+ # -- REQUIRED in the first reply from the target
+ # supportedMech [1] MechType OPTIONAL,
+ # -- present only in the first reply from the target
+ # responseToken [2] OCTET STRING OPTIONAL,
+ # mechListMIC [3] OCTET STRING OPTIONAL,
+ # ...
+ # }
+ # This structure is not prepended by a GSS generic header!
+ SPNEGO_NEG_TOKEN_RESP = 0xa1
+ SPNEGO_NEG_TOKEN_TARG = 0xa0
+
+ def __init__(self, data = None):
+ self.fields = {}
+ if data:
+ self.fromString(data)
+ pass
+
+ def __setitem__(self,key,value):
+ self.fields[key] = value
+
+ def __getitem__(self, key):
+ return self.fields[key]
+
+ def __delitem__(self, key):
+ del self.fields[key]
+
+ def __len__(self):
+ return len(self.getData())
+
+ def __str__(self):
+ return len(self.getData())
+
+ def fromString(self, data = 0):
+ payload = data
+ next_byte = unpack('B', payload[:1])[0]
+ if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP:
+ raise Exception('NegTokenResp not found %x' % next_byte)
+ payload = payload[1:]
+ decode_data, total_bytes = asn1decode(payload)
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_SEQUENCE:
+ raise Exception('SEQUENCE tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ decode_data, total_bytes = asn1decode(decode_data)
+ next_byte = unpack('B',decode_data[:1])[0]
+
+ if next_byte != ASN1_MECH_TYPE:
+ # MechType not found, could be an AUTH answer
+ if next_byte != ASN1_RESPONSE_TOKEN:
+ raise Exception('MechType/ResponseToken tag not found %x' % next_byte)
+ else:
+ decode_data2 = decode_data[1:]
+ decode_data2, total_bytes = asn1decode(decode_data2)
+ next_byte = unpack('B', decode_data2[:1])[0]
+ if next_byte != ASN1_ENUMERATED:
+ raise Exception('Enumerated tag not found %x' % next_byte)
+ item, total_bytes2 = asn1decode(decode_data)
+ self['NegResult'] = item
+ decode_data = decode_data[1:]
+ decode_data = decode_data[total_bytes:]
+
+ # Do we have more data?
+ if len(decode_data) == 0:
+ return
+
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_SUPPORTED_MECH:
+ if next_byte != ASN1_RESPONSE_TOKEN:
+ raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte)
+ else:
+ decode_data2 = decode_data[1:]
+ decode_data2, total_bytes = asn1decode(decode_data2)
+ next_byte = unpack('B', decode_data2[:1])[0]
+ if next_byte != ASN1_OID:
+ raise Exception('OID tag not found %x' % next_byte)
+ decode_data2 = decode_data2[1:]
+ item, total_bytes2 = asn1decode(decode_data2)
+ self['SuportedMech'] = item
+
+ decode_data = decode_data[1:]
+ decode_data = decode_data[total_bytes:]
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_RESPONSE_TOKEN:
+ raise Exception('Response token tag not found %x' % next_byte)
+
+ decode_data = decode_data[1:]
+ decode_data, total_bytes = asn1decode(decode_data)
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_OCTET_STRING:
+ raise Exception('Octet string token tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ decode_data, total_bytes = asn1decode(decode_data)
+ self['ResponseToken'] = decode_data
+
+ def dump(self):
+ for i in self.fields.keys():
+ print "%s: {%r}" % (i,self[i])
+
+ def getData(self):
+ ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP)
+ if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'):
+ # Server resp
+ ans += asn1encode(
+ pack('B', ASN1_SEQUENCE) +
+ asn1encode(
+ pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
+ asn1encode(
+ pack('B',ASN1_ENUMERATED) +
+ asn1encode( self['NegResult'] )) +
+ pack('B',ASN1_SUPPORTED_MECH) +
+ asn1encode(
+ pack('B',ASN1_OID) +
+ asn1encode(self['SupportedMech'])) +
+ pack('B',ASN1_RESPONSE_TOKEN ) +
+ asn1encode(
+ pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
+ elif self.fields.has_key('NegResult'):
+ # Server resp
+ ans += asn1encode(
+ pack('B', ASN1_SEQUENCE) +
+ asn1encode(
+ pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) +
+ asn1encode(
+ pack('B',ASN1_ENUMERATED) +
+ asn1encode( self['NegResult'] ))))
+ else:
+ # Client resp
+ ans += asn1encode(
+ pack('B', ASN1_SEQUENCE) +
+ asn1encode(
+ pack('B', ASN1_RESPONSE_TOKEN) +
+ asn1encode(
+ pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken']))))
+ return ans
+
+class SPNEGO_NegTokenInit(GSSAPI):
+ # http://tools.ietf.org/html/rfc4178#page-8
+ # NegTokeInit :: = SEQUENCE {
+ # mechTypes [0] MechTypeList,
+ # reqFlags [1] ContextFlags OPTIONAL,
+ # mechToken [2] OCTET STRING OPTIONAL,
+ # mechListMIC [3] OCTET STRING OPTIONAL,
+ # }
+ SPNEGO_NEG_TOKEN_INIT = 0xa0
+ def fromString(self, data = 0):
+ GSSAPI.fromString(self, data)
+ payload = self['Payload']
+ next_byte = unpack('B', payload[:1])[0]
+ if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT:
+ raise Exception('NegTokenInit not found %x' % next_byte)
+ payload = payload[1:]
+ decode_data, total_bytes = asn1decode(payload)
+ # Now we should have a SEQUENCE Tag
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_SEQUENCE:
+ raise Exception('SEQUENCE tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ decode_data, total_bytes2 = asn1decode(decode_data)
+ next_byte = unpack('B',decode_data[:1])[0]
+ if next_byte != ASN1_MECH_TYPE:
+ raise Exception('MechType tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ remaining_data = decode_data
+ decode_data, total_bytes3 = asn1decode(decode_data)
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_SEQUENCE:
+ raise Exception('SEQUENCE tag not found %x' % next_byte)
+ decode_data = decode_data[1:]
+ decode_data, total_bytes4 = asn1decode(decode_data)
+ # And finally we should have the MechTypes
+ self['MechTypes'] = []
+ while decode_data:
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte != ASN1_OID:
+ # Not a valid OID, there must be something else we won't unpack
+ break
+ decode_data = decode_data[1:]
+ item, total_bytes = asn1decode(decode_data)
+ self['MechTypes'].append(item)
+ decode_data = decode_data[total_bytes:]
+
+ # Do we have MechTokens as well?
+ decode_data = remaining_data[total_bytes3:]
+ if len(decode_data) > 0:
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte == ASN1_MECH_TOKEN:
+ # We have tokens in here!
+ decode_data = decode_data[1:]
+ decode_data, total_bytes = asn1decode(decode_data)
+ next_byte = unpack('B', decode_data[:1])[0]
+ if next_byte == ASN1_OCTET_STRING:
+ decode_data = decode_data[1:]
+ decode_data, total_bytes = asn1decode(decode_data)
+ self['MechToken'] = decode_data
+
+ def getData(self):
+ mechTypes = ''
+ for i in self['MechTypes']:
+ mechTypes += pack('B', ASN1_OID)
+ mechTypes += asn1encode(i)
+
+ mechToken = ''
+ # Do we have tokens to send?
+ if self.fields.has_key('MechToken'):
+ mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode(
+ pack('B', ASN1_OCTET_STRING) + asn1encode(
+ self['MechToken']))
+
+ ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT)
+ ans += asn1encode(
+ pack('B', ASN1_SEQUENCE) +
+ asn1encode(
+ pack('B', ASN1_MECH_TYPE) +
+ asn1encode(
+ pack('B', ASN1_SEQUENCE) +
+ asn1encode(mechTypes)) + mechToken ))
+
+
+ self['Payload'] = ans
+ return GSSAPI.getData(self)
+