diff options
Diffstat (limited to 'tests/python_dependencies/impacket/spnego.py')
-rw-r--r-- | tests/python_dependencies/impacket/spnego.py | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/tests/python_dependencies/impacket/spnego.py b/tests/python_dependencies/impacket/spnego.py new file mode 100644 index 000000000..f177d18f7 --- /dev/null +++ b/tests/python_dependencies/impacket/spnego.py @@ -0,0 +1,372 @@ +# Copyright (c) 2003-2016 CORE Security Technologies +# +# This software is provided under under a slightly modified version +# of the Apache Software License. See the accompanying LICENSE file +# for more information. +# +# Author: Alberto Solino (beto@coresecurity.com) +# +# Description: +# SPNEGO functions used by SMB, SMB2/3 and DCERPC +# + +from struct import pack, unpack, calcsize + +############### GSS Stuff ################ +GSS_API_SPNEGO_UUID = '\x2b\x06\x01\x05\x05\x02' +ASN1_SEQUENCE = 0x30 +ASN1_AID = 0x60 +ASN1_OID = 0x06 +ASN1_OCTET_STRING = 0x04 +ASN1_MECH_TYPE = 0xa0 +ASN1_MECH_TOKEN = 0xa2 +ASN1_SUPPORTED_MECH = 0xa1 +ASN1_RESPONSE_TOKEN = 0xa2 +ASN1_ENUMERATED = 0x0a +MechTypes = { +'+\x06\x01\x04\x01\x827\x02\x02\x1e': 'SNMPv2-SMI::enterprises.311.2.2.30', +'+\x06\x01\x04\x01\x827\x02\x02\n': 'NTLMSSP - Microsoft NTLM Security Support Provider', +'*\x86H\x82\xf7\x12\x01\x02\x02': 'MS KRB5 - Microsoft Kerberos 5', +'*\x86H\x86\xf7\x12\x01\x02\x02': 'KRB5 - Kerberos 5', +'*\x86H\x86\xf7\x12\x01\x02\x02\x03': 'KRB5 - Kerberos 5 - User to User' +} +TypesMech = dict((v,k) for k, v in MechTypes.iteritems()) + +def asn1encode(data = ''): + #res = asn1.SEQUENCE(str).encode() + #import binascii + #print '\nalex asn1encode str: %s\n' % binascii.hexlify(str) + if 0 <= len(data) <= 0x7F: + res = pack('B', len(data)) + data + elif 0x80 <= len(data) <= 0xFF: + res = pack('BB', 0x81, len(data)) + data + elif 0x100 <= len(data) <= 0xFFFF: + res = pack('!BH', 0x82, len(data)) + data + elif 0x10000 <= len(data) <= 0xffffff: + res = pack('!BBH', 0x83, len(data) >> 16, len(data) & 0xFFFF) + data + elif 0x1000000 <= len(data) <= 0xffffffff: + res = pack('!BL', 0x84, len(data)) + data + else: + raise Exception('Error in asn1encode') + return str(res) + +def asn1decode(data = ''): + len1 = unpack('B', data[:1])[0] + data = data[1:] + if len1 == 0x81: + pad = calcsize('B') + len2 = unpack('B',data[:pad])[0] + data = data[pad:] + ans = data[:len2] + elif len1 == 0x82: + pad = calcsize('H') + len2 = unpack('!H', data[:pad])[0] + data = data[pad:] + ans = data[:len2] + elif len1 == 0x83: + pad = calcsize('B') + calcsize('!H') + len2, len3 = unpack('!BH', data[:pad]) + data = data[pad:] + ans = data[:len2 << 16 + len3] + elif len1 == 0x84: + pad = calcsize('!L') + len2 = unpack('!L', data[:pad])[0] + data = data[pad:] + ans = data[:len2] + # 1 byte length, string <= 0x7F + else: + pad = 0 + ans = data[:len1] + return ans, len(ans)+pad+1 + +class GSSAPI: +# Generic GSSAPI Header Format + def __init__(self, data = None): + self.fields = {} + self['UUID'] = GSS_API_SPNEGO_UUID + if data: + self.fromString(data) + pass + + def __setitem__(self,key,value): + self.fields[key] = value + + def __getitem__(self, key): + return self.fields[key] + + def __delitem__(self, key): + del self.fields[key] + + def __len__(self): + return len(self.getData()) + + def __str__(self): + return len(self.getData()) + + def fromString(self, data = None): + # Manual parse of the GSSAPI Header Format + # It should be something like + # AID = 0x60 TAG, BER Length + # OID = 0x06 TAG + # GSSAPI OID + # UUID data (BER Encoded) + # Payload + next_byte = unpack('B',data[:1])[0] + if next_byte != ASN1_AID: + raise Exception('Unknown AID=%x' % next_byte) + data = data[1:] + decode_data, total_bytes = asn1decode(data) + # Now we should have a OID tag + next_byte = unpack('B',decode_data[:1])[0] + if next_byte != ASN1_OID: + raise Exception('OID tag not found %x' % next_byte) + decode_data = decode_data[1:] + # Now the OID contents, should be SPNEGO UUID + uuid, total_bytes = asn1decode(decode_data) + self['OID'] = uuid + # the rest should be the data + self['Payload'] = decode_data[total_bytes:] + #pass + + def dump(self): + for i in self.fields.keys(): + print "%s: {%r}" % (i,self[i]) + + def getData(self): + ans = pack('B',ASN1_AID) + ans += asn1encode( + pack('B',ASN1_OID) + + asn1encode(self['UUID']) + + self['Payload'] ) + return ans + +class SPNEGO_NegTokenResp: + # http://tools.ietf.org/html/rfc4178#page-9 + # NegTokenResp ::= SEQUENCE { + # negState [0] ENUMERATED { + # accept-completed (0), + # accept-incomplete (1), + # reject (2), + # request-mic (3) + # } OPTIONAL, + # -- REQUIRED in the first reply from the target + # supportedMech [1] MechType OPTIONAL, + # -- present only in the first reply from the target + # responseToken [2] OCTET STRING OPTIONAL, + # mechListMIC [3] OCTET STRING OPTIONAL, + # ... + # } + # This structure is not prepended by a GSS generic header! + SPNEGO_NEG_TOKEN_RESP = 0xa1 + SPNEGO_NEG_TOKEN_TARG = 0xa0 + + def __init__(self, data = None): + self.fields = {} + if data: + self.fromString(data) + pass + + def __setitem__(self,key,value): + self.fields[key] = value + + def __getitem__(self, key): + return self.fields[key] + + def __delitem__(self, key): + del self.fields[key] + + def __len__(self): + return len(self.getData()) + + def __str__(self): + return len(self.getData()) + + def fromString(self, data = 0): + payload = data + next_byte = unpack('B', payload[:1])[0] + if next_byte != SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP: + raise Exception('NegTokenResp not found %x' % next_byte) + payload = payload[1:] + decode_data, total_bytes = asn1decode(payload) + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_SEQUENCE: + raise Exception('SEQUENCE tag not found %x' % next_byte) + decode_data = decode_data[1:] + decode_data, total_bytes = asn1decode(decode_data) + next_byte = unpack('B',decode_data[:1])[0] + + if next_byte != ASN1_MECH_TYPE: + # MechType not found, could be an AUTH answer + if next_byte != ASN1_RESPONSE_TOKEN: + raise Exception('MechType/ResponseToken tag not found %x' % next_byte) + else: + decode_data2 = decode_data[1:] + decode_data2, total_bytes = asn1decode(decode_data2) + next_byte = unpack('B', decode_data2[:1])[0] + if next_byte != ASN1_ENUMERATED: + raise Exception('Enumerated tag not found %x' % next_byte) + item, total_bytes2 = asn1decode(decode_data) + self['NegResult'] = item + decode_data = decode_data[1:] + decode_data = decode_data[total_bytes:] + + # Do we have more data? + if len(decode_data) == 0: + return + + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_SUPPORTED_MECH: + if next_byte != ASN1_RESPONSE_TOKEN: + raise Exception('Supported Mech/ResponseToken tag not found %x' % next_byte) + else: + decode_data2 = decode_data[1:] + decode_data2, total_bytes = asn1decode(decode_data2) + next_byte = unpack('B', decode_data2[:1])[0] + if next_byte != ASN1_OID: + raise Exception('OID tag not found %x' % next_byte) + decode_data2 = decode_data2[1:] + item, total_bytes2 = asn1decode(decode_data2) + self['SuportedMech'] = item + + decode_data = decode_data[1:] + decode_data = decode_data[total_bytes:] + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_RESPONSE_TOKEN: + raise Exception('Response token tag not found %x' % next_byte) + + decode_data = decode_data[1:] + decode_data, total_bytes = asn1decode(decode_data) + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_OCTET_STRING: + raise Exception('Octet string token tag not found %x' % next_byte) + decode_data = decode_data[1:] + decode_data, total_bytes = asn1decode(decode_data) + self['ResponseToken'] = decode_data + + def dump(self): + for i in self.fields.keys(): + print "%s: {%r}" % (i,self[i]) + + def getData(self): + ans = pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_RESP) + if self.fields.has_key('NegResult') and self.fields.has_key('SupportedMech'): + # Server resp + ans += asn1encode( + pack('B', ASN1_SEQUENCE) + + asn1encode( + pack('B',SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) + + asn1encode( + pack('B',ASN1_ENUMERATED) + + asn1encode( self['NegResult'] )) + + pack('B',ASN1_SUPPORTED_MECH) + + asn1encode( + pack('B',ASN1_OID) + + asn1encode(self['SupportedMech'])) + + pack('B',ASN1_RESPONSE_TOKEN ) + + asn1encode( + pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken'])))) + elif self.fields.has_key('NegResult'): + # Server resp + ans += asn1encode( + pack('B', ASN1_SEQUENCE) + + asn1encode( + pack('B', SPNEGO_NegTokenResp.SPNEGO_NEG_TOKEN_TARG) + + asn1encode( + pack('B',ASN1_ENUMERATED) + + asn1encode( self['NegResult'] )))) + else: + # Client resp + ans += asn1encode( + pack('B', ASN1_SEQUENCE) + + asn1encode( + pack('B', ASN1_RESPONSE_TOKEN) + + asn1encode( + pack('B', ASN1_OCTET_STRING) + asn1encode(self['ResponseToken'])))) + return ans + +class SPNEGO_NegTokenInit(GSSAPI): + # http://tools.ietf.org/html/rfc4178#page-8 + # NegTokeInit :: = SEQUENCE { + # mechTypes [0] MechTypeList, + # reqFlags [1] ContextFlags OPTIONAL, + # mechToken [2] OCTET STRING OPTIONAL, + # mechListMIC [3] OCTET STRING OPTIONAL, + # } + SPNEGO_NEG_TOKEN_INIT = 0xa0 + def fromString(self, data = 0): + GSSAPI.fromString(self, data) + payload = self['Payload'] + next_byte = unpack('B', payload[:1])[0] + if next_byte != SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT: + raise Exception('NegTokenInit not found %x' % next_byte) + payload = payload[1:] + decode_data, total_bytes = asn1decode(payload) + # Now we should have a SEQUENCE Tag + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_SEQUENCE: + raise Exception('SEQUENCE tag not found %x' % next_byte) + decode_data = decode_data[1:] + decode_data, total_bytes2 = asn1decode(decode_data) + next_byte = unpack('B',decode_data[:1])[0] + if next_byte != ASN1_MECH_TYPE: + raise Exception('MechType tag not found %x' % next_byte) + decode_data = decode_data[1:] + remaining_data = decode_data + decode_data, total_bytes3 = asn1decode(decode_data) + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_SEQUENCE: + raise Exception('SEQUENCE tag not found %x' % next_byte) + decode_data = decode_data[1:] + decode_data, total_bytes4 = asn1decode(decode_data) + # And finally we should have the MechTypes + self['MechTypes'] = [] + while decode_data: + next_byte = unpack('B', decode_data[:1])[0] + if next_byte != ASN1_OID: + # Not a valid OID, there must be something else we won't unpack + break + decode_data = decode_data[1:] + item, total_bytes = asn1decode(decode_data) + self['MechTypes'].append(item) + decode_data = decode_data[total_bytes:] + + # Do we have MechTokens as well? + decode_data = remaining_data[total_bytes3:] + if len(decode_data) > 0: + next_byte = unpack('B', decode_data[:1])[0] + if next_byte == ASN1_MECH_TOKEN: + # We have tokens in here! + decode_data = decode_data[1:] + decode_data, total_bytes = asn1decode(decode_data) + next_byte = unpack('B', decode_data[:1])[0] + if next_byte == ASN1_OCTET_STRING: + decode_data = decode_data[1:] + decode_data, total_bytes = asn1decode(decode_data) + self['MechToken'] = decode_data + + def getData(self): + mechTypes = '' + for i in self['MechTypes']: + mechTypes += pack('B', ASN1_OID) + mechTypes += asn1encode(i) + + mechToken = '' + # Do we have tokens to send? + if self.fields.has_key('MechToken'): + mechToken = pack('B', ASN1_MECH_TOKEN) + asn1encode( + pack('B', ASN1_OCTET_STRING) + asn1encode( + self['MechToken'])) + + ans = pack('B',SPNEGO_NegTokenInit.SPNEGO_NEG_TOKEN_INIT) + ans += asn1encode( + pack('B', ASN1_SEQUENCE) + + asn1encode( + pack('B', ASN1_MECH_TYPE) + + asn1encode( + pack('B', ASN1_SEQUENCE) + + asn1encode(mechTypes)) + mechToken )) + + + self['Payload'] = ans + return GSSAPI.getData(self) + |