summaryrefslogtreecommitdiff
path: root/python/samba
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2016-09-16 11:11:58 +0200
committerAndreas Schneider <asn@cryptomilk.org>2016-10-26 11:20:19 +0200
commitd5b58bb730127a89feced87ad2218c4fdd1f8e1c (patch)
treef20f1422a9af6f9a5cf9d26a209de141b32a9237 /python/samba
parentb788507cff78603912a469ca75b48739a834fd63 (diff)
downloadsamba-d5b58bb730127a89feced87ad2218c4fdd1f8e1c.tar.gz
python:tests: add more helper functions to RawDCERPCTest
Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andreas Schneider <asn@samba.org>
Diffstat (limited to 'python/samba')
-rw-r--r--python/samba/tests/__init__.py420
1 files changed, 354 insertions, 66 deletions
diff --git a/python/samba/tests/__init__.py b/python/samba/tests/__init__.py
index dec9ff52e5a..de3dc9edd8e 100644
--- a/python/samba/tests/__init__.py
+++ b/python/samba/tests/__init__.py
@@ -29,6 +29,8 @@ import samba.ndr
import samba.dcerpc.dcerpc
import samba.dcerpc.base
import samba.dcerpc.epmapper
+from samba.credentials import Credentials
+from samba import gensec
import socket
import struct
import subprocess
@@ -273,37 +275,352 @@ class RawDCERPCTest(TestCase):
self.connect()
- def epmap_reconnect(self, abstract):
- ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
-
- tsf0_list = [ndr32]
- ctx0 = samba.dcerpc.dcerpc.ctx_list()
- ctx0.context_id = 1
- ctx0.num_transfer_syntaxes = len(tsf0_list)
- ctx0.abstract_syntax = samba.dcerpc.epmapper.abstract_syntax()
- ctx0.transfer_syntaxes = tsf0_list
+ def get_user_creds(self):
+ c = Credentials()
+ c.guess()
+ username = samba.tests.env_get_var_value('USERNAME')
+ password = samba.tests.env_get_var_value('PASSWORD')
+ c.set_username(username)
+ c.set_password(password)
+ return c
+
+ def get_anon_creds(self):
+ c = Credentials()
+ c.set_anonymous()
+ return c
+
+ def get_auth_context_creds(self, creds, auth_type, auth_level,
+ auth_context_id,
+ g_auth_level=None):
+
+ if g_auth_level is None:
+ g_auth_level = auth_level
+
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(creds)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ g.start_mech_by_authtype(auth_type, g_auth_level)
+
+ auth_context = {}
+ auth_context["auth_type"] = auth_type
+ auth_context["auth_level"] = auth_level
+ auth_context["auth_context_id"] = auth_context_id
+ auth_context["g_auth_level"] = g_auth_level
+ auth_context["gensec"] = g
+
+ return auth_context
+
+ def do_generic_bind(self, ctx, auth_context=None,
+ pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ assoc_group_id=0, call_id=0,
+ nak_reason=None, alter_fault=None):
+ ctx_list = [ctx]
+
+ if auth_context is not None:
+ from_server = ""
+ (finished, to_server) = auth_context["gensec"].update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
+ auth_level=auth_context["auth_level"],
+ auth_context_id=auth_context["auth_context_id"],
+ auth_blob=to_server)
+ else:
+ auth_info = ""
- req = self.generate_bind(call_id=0, ctx_list=[ctx0])
+ req = self.generate_bind(call_id=call_id,
+ pfc_flags=pfc_flags,
+ ctx_list=ctx_list,
+ assoc_group_id=assoc_group_id,
+ auth_info=auth_info)
self.send_pdu(req)
rep = self.recv_pdu()
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK,
- req.call_id, auth_length=0)
- self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag)
- self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag)
- self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id)
- self.assertEqual(rep.u.secondary_address_size, 4)
- self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port)
- self.assertEqual(len(rep.u._pad1), 2)
- self.assertEqual(rep.u._pad1, '\0' * 2)
- self.assertEqual(rep.u.num_results, 1)
- self.assertEqual(rep.u.ctx_list[0].result,
+ if nak_reason is not None:
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.reject_reason, nak_reason)
+ self.assertEquals(rep.u.num_versions, 1)
+ self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
+ self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+ self.assertEquals(len(rep.u._pad), 3)
+ self.assertEquals(rep.u._pad, '\0' * 3)
+ return
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
+ pfc_flags=pfc_flags)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ if assoc_group_id != 0:
+ self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
+ else:
+ self.assertNotEquals(rep.u.assoc_group_id, 0)
+ assoc_group_id = rep.u.assoc_group_id
+ port_str = "%d" % self.tcp_port
+ port_len = len(port_str) + 1
+ mod_len = (2 + port_len) % 4
+ if mod_len != 0:
+ port_pad = 4 - mod_len
+ else:
+ port_pad = 0
+ self.assertEquals(rep.u.secondary_address_size, port_len)
+ self.assertEquals(rep.u.secondary_address, port_str)
+ self.assertEquals(len(rep.u._pad1), port_pad)
+ # sometimes windows sends random bytes
+ # self.assertEquals(rep.u._pad1, '\0' * port_pad)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
- self.assertEqual(rep.u.ctx_list[0].reason,
+ self.assertEquals(rep.u.ctx_list[0].reason,
samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
- self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
- self.assertEqual(rep.u.auth_info, '\0' * 0)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
+ ack = rep
+ if auth_context is None:
+ self.assertEquals(rep.auth_length, 0)
+ self.assertEquals(len(rep.u.auth_info), 0)
+ return ack
+ self.assertNotEquals(rep.auth_length, 0)
+ self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+ self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = auth_context["gensec"].update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
+ auth_level=auth_context["auth_level"],
+ auth_context_id=auth_context["auth_context_id"],
+ auth_blob=to_server)
+ req = self.generate_alter(call_id=call_id,
+ ctx_list=ctx_list,
+ assoc_group_id=0xffffffff-assoc_group_id,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ if alter_fault is not None:
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, 0)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.flags, 0)
+ self.assertEquals(rep.u.status, alter_fault)
+ self.assertEquals(rep.u.reserved, 0)
+ self.assertEquals(len(rep.u.error_and_verifier), 0)
+ return None
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertEquals(rep.u.assoc_group_id, assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 0)
+ self.assertEquals(rep.u.secondary_address, '')
+ self.assertEquals(len(rep.u._pad1), 2)
+ # sometimes windows sends random bytes
+ # self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ samba.dcerpc.dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ samba.dcerpc.dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ctx.transfer_syntaxes[0])
+ self.assertNotEquals(rep.auth_length, 0)
+ self.assertGreater(len(rep.u.auth_info), samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+ self.assertEquals(rep.auth_length, len(rep.u.auth_info) - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH)
+
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = auth_context["gensec"].update(from_server)
+ self.assertTrue(finished)
+
+ return ack
+
+ def prepare_presentation(self, abstract, transfer, object=None,
+ context_id=0xffff, epmap=False, auth_context=None,
+ pfc_flags=samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ assoc_group_id=0,
+ return_ack=False):
+ if epmap:
+ self.epmap_reconnect(abstract, transfer=transfer, object=object)
+
+ tsf1_list = [transfer]
+ ctx = samba.dcerpc.dcerpc.ctx_list()
+ ctx.context_id = context_id
+ ctx.num_transfer_syntaxes = len(tsf1_list)
+ ctx.abstract_syntax = abstract
+ ctx.transfer_syntaxes = tsf1_list
+
+ ack = self.do_generic_bind(ctx=ctx,
+ auth_context=auth_context,
+ pfc_flags=pfc_flags,
+ assoc_group_id=assoc_group_id)
+ if ack is None:
+ ctx = None
+
+ if return_ack:
+ return (ctx, ack)
+ return ctx
+
+ def do_single_request(self, call_id, ctx, io,
+ auth_context=None,
+ object=None,
+ bigendian=False, ndr64=False,
+ allow_remaining=False,
+ send_req=True,
+ recv_rep=True,
+ fault_pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST |
+ samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST,
+ fault_status=None,
+ fault_context_id=None,
+ timeout=None,
+ ndr_print=None,
+ hexdump=None):
+
+ if fault_context_id is None:
+ fault_context_id = ctx.context_id
+
+ if ndr_print is None:
+ ndr_print = self.do_ndr_print
+ if hexdump is None:
+ hexdump = self.do_hexdump
+
+ if send_req:
+ if ndr_print:
+ sys.stderr.write("in: %s" % samba.ndr.ndr_print_in(io))
+ stub_in = samba.ndr.ndr_pack_in(io, bigendian=bigendian, ndr64=ndr64)
+ if hexdump:
+ sys.stderr.write("stub_in: %d\n%s" % (len(stub_in), self.hexdump(stub_in)))
+ else:
+ # only used for sig_size calculation
+ stub_in = '\xff' * samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
+
+ sig_size = 0
+ if auth_context is not None:
+ mod_len = len(stub_in) % samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT
+ auth_pad_length = 0
+ if mod_len > 0:
+ auth_pad_length = samba.dcerpc.dcerpc.DCERPC_AUTH_PAD_ALIGNMENT - mod_len
+ stub_in += '\x00' * auth_pad_length
+
+ if auth_context["g_auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+ sig_size = auth_context["gensec"].sig_size(len(stub_in))
+ else:
+ sig_size = 16
+
+ zero_sig = "\x00"*sig_size
+ auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
+ auth_level=auth_context["auth_level"],
+ auth_pad_length=auth_pad_length,
+ auth_context_id=auth_context["auth_context_id"],
+ auth_blob=zero_sig)
+ else:
+ auth_info=""
+
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
+ if object is not None:
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_OBJECT_UUID
+
+ req = self.generate_request(call_id=call_id,
+ context_id=ctx.context_id,
+ pfc_flags=pfc_flags,
+ object=object,
+ opnum=io.opnum(),
+ stub=stub_in,
+ auth_info=auth_info)
+
+ if send_req:
+ if sig_size != 0 and auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+ req_blob = samba.ndr.ndr_pack(req)
+ ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
+ ofs_sig = len(req_blob) - req.auth_length
+ ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ req_data = req_blob[ofs_stub:ofs_trailer]
+ req_whole = req_blob[0:ofs_sig]
+ sig = auth_context["gensec"].sign_packet(req_data, req_whole)
+ auth_info = self.generate_auth(auth_type=auth_context["auth_type"],
+ auth_level=auth_context["auth_level"],
+ auth_pad_length=auth_pad_length,
+ auth_context_id=auth_context["auth_context_id"],
+ auth_blob=sig)
+ req = self.generate_request(call_id=call_id,
+ context_id=ctx.context_id,
+ pfc_flags=pfc_flags,
+ object=object,
+ opnum=io.opnum(),
+ stub=stub_in,
+ auth_info=auth_info)
+ self.send_pdu(req, ndr_print=ndr_print, hexdump=hexdump)
+ if recv_rep:
+ (rep, rep_blob) = self.recv_pdu_raw(timeout=timeout,
+ ndr_print=ndr_print,
+ hexdump=hexdump)
+ if fault_status:
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=fault_pfc_flags, auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, fault_context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.flags, 0)
+ self.assertEquals(rep.u.status, fault_status)
+ self.assertEquals(rep.u.reserved, 0)
+ self.assertEquals(len(rep.u.error_and_verifier), 0)
+ return
+
+ self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=sig_size)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+ if sig_size != 0:
+
+ ofs_stub = samba.dcerpc.dcerpc.DCERPC_REQUEST_LENGTH
+ ofs_sig = rep.frag_length - rep.auth_length
+ ofs_trailer = ofs_sig - samba.dcerpc.dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ rep_data = rep_blob[ofs_stub:ofs_trailer]
+ rep_whole = rep_blob[0:ofs_sig]
+ rep_sig = rep_blob[ofs_sig:]
+ rep_auth_info_blob = rep_blob[ofs_trailer:]
+
+ rep_auth_info = self.parse_auth(rep_auth_info_blob)
+ self.assertEquals(rep_auth_info.auth_type, auth_context["auth_type"])
+ self.assertEquals(rep_auth_info.auth_level, auth_context["auth_level"])
+ self.assertLessEqual(rep_auth_info.auth_pad_length, len(rep_data))
+ self.assertEquals(rep_auth_info.auth_reserved, 0)
+ self.assertEquals(rep_auth_info.auth_context_id, auth_context["auth_context_id"])
+ self.assertEquals(rep_auth_info.credentials, rep_sig)
+
+ if auth_context["auth_level"] >= samba.dcerpc.dcerpc.DCERPC_AUTH_LEVEL_PACKET:
+ auth_context["gensec"].check_packet(rep_data, rep_whole, rep_sig)
+
+ stub_out = rep_data[0:-rep_auth_info.auth_pad_length]
+ else:
+ stub_out = rep.u.stub_and_verifier
+
+ if hexdump:
+ sys.stderr.write("stub_out: %d\n%s" % (len(stub_out), self.hexdump(stub_out)))
+ samba.ndr.ndr_unpack_out(io, stub_out, bigendian=bigendian, ndr64=ndr64,
+ allow_remaining=allow_remaining)
+ if ndr_print:
+ sys.stderr.write("out: %s" % samba.ndr.ndr_print_out(io))
+
+ def epmap_reconnect(self, abstract, transfer=None, object=None):
+ ndr32 = samba.dcerpc.base.transfer_syntax_ndr()
+
+ if transfer is None:
+ transfer = ndr32
+
+ if object is None:
+ object = samba.dcerpc.misc.GUID()
+
+ ctx = self.prepare_presentation(samba.dcerpc.epmapper.abstract_syntax(),
+ transfer, context_id=0)
- # And now try a request
data1 = samba.ndr.ndr_pack(abstract)
lhs1 = samba.dcerpc.epmapper.epm_lhs()
lhs1.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
@@ -313,7 +630,7 @@ class RawDCERPCTest(TestCase):
floor1 = samba.dcerpc.epmapper.epm_floor()
floor1.lhs = lhs1
floor1.rhs = rhs1
- data2 = samba.ndr.ndr_pack(ndr32)
+ data2 = samba.ndr.ndr_pack(transfer)
lhs2 = samba.dcerpc.epmapper.epm_lhs()
lhs2.protocol = samba.dcerpc.epmapper.EPM_PROTOCOL_UUID
lhs2.lhs_data = data2[:18]
@@ -348,46 +665,17 @@ class RawDCERPCTest(TestCase):
req_twr = samba.dcerpc.epmapper.epm_twr_t()
req_twr.tower = req_tower
- pack_twr = samba.ndr.ndr_pack(req_twr)
-
- # object
- stub = "\x01\x00\x00\x00"
- stub += "\x00" * 16
- # tower
- stub += "\x02\x00\x00\x00"
- stub += pack_twr
- # padding?
- stub += "\x00" * 1
- # handle
- stub += "\x00" * 20
- # max_towers
- stub += "\x04\x00\x00\x00"
-
- # we do an epm_Map() request
- req = self.generate_request(call_id = 1,
- context_id=ctx0.context_id,
- opnum=3,
- stub=stub)
- self.send_pdu(req)
- rep = self.recv_pdu()
- self.verify_pdu(rep, samba.dcerpc.dcerpc.DCERPC_PKT_RESPONSE,
- req.call_id, auth_length=0)
- self.assertNotEqual(rep.u.alloc_hint, 0)
- self.assertEqual(rep.u.context_id, req.u.context_id)
- self.assertEqual(rep.u.cancel_count, 0)
- self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
-
- num_towers = struct.unpack_from("<I", rep.u.stub_and_verifier, 20)
- (array_max, array_ofs, array_cnt) = struct.unpack_from("<III", rep.u.stub_and_verifier, 24)
- status = struct.unpack_from("<I", rep.u.stub_and_verifier, len(rep.u.stub_and_verifier) - 4)
- self.assertEqual(status[0], 0)
- self.assertGreaterEqual(num_towers[0], 1)
- self.assertEqual(array_max, 4)
- self.assertEqual(array_ofs, 0)
- self.assertGreaterEqual(array_cnt, 1)
-
- unpack_twr = rep.u.stub_and_verifier[(36 + 4 * array_cnt):-4]
- rep_twr = samba.ndr.ndr_unpack(samba.dcerpc.epmapper.epm_twr_t, unpack_twr, allow_remaining=True)
+ epm_map = samba.dcerpc.epmapper.epm_Map()
+ epm_map.in_object = object
+ epm_map.in_map_tower = req_twr
+ epm_map.in_entry_handle = samba.dcerpc.misc.policy_handle()
+ epm_map.in_max_towers = 4
+
+ self.do_single_request(call_id=2, ctx=ctx, io=epm_map)
+
+ self.assertGreaterEqual(epm_map.out_num_towers, 1)
+ rep_twr = epm_map.out_towers[0].twr
+ self.assertIsNotNone(rep_twr)
self.assertEqual(rep_twr.tower_length, 75)
self.assertEqual(rep_twr.tower.num_floors, 5)
self.assertEqual(len(rep_twr.tower.floors), 5)