From 5466ed1232ee5e13335a79bf04e6205f901f6e21 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Wed, 21 Nov 2018 11:49:40 +0100 Subject: py:dcerpc/raw_protocol: add tests for delayed header signing activation BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892 Signed-off-by: Stefan Metzmacher Reviewed-by: Jeremy Allison --- python/samba/tests/dcerpc/raw_protocol.py | 186 ++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) (limited to 'python') diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index aab6d86253f..384b7d47bca 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -4988,6 +4988,192 @@ class TestDCERPC_BIND(RawDCERPCTest): conn2._disconnect("End of Test") return + def _test_krb5_hdr_sign_delayed1(self, do_upgrade): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context) + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + + # + # This is just an alter context without authentication + # But it can turn on header signing for the whole connection + # + ack2 = self.do_generic_bind(call_id=3, ctx=ctx, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN, + assoc_group_id = ack.u.assoc_group_id, + start_with_alter=True) + + self.assertFalse(auth_context['hdr_signing']) + if do_upgrade: + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=None + else: + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + if fault_status is not None: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + + self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + return + + def test_krb5_hdr_sign_delayed1_ok1(self): + return self._test_krb5_hdr_sign_delayed1(do_upgrade=True) + + def test_krb5_hdr_sign_delayed1_fail1(self): + return self._test_krb5_hdr_sign_delayed1(do_upgrade=False) + + def _test_krb5_hdr_sign_delayed2(self, do_upgrade): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + # + # SUPPORT_HEADER_SIGN on alter context activates header signing + # + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context, + pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN) + + self.assertFalse(auth_context['hdr_signing']) + if do_upgrade: + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=None + else: + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + if fault_status is not None: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + + self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + return + + def test_krb5_hdr_sign_delayed2_ok1(self): + return self._test_krb5_hdr_sign_delayed2(do_upgrade=True) + + def test_krb5_hdr_sign_delayed2_fail1(self): + return self._test_krb5_hdr_sign_delayed2(do_upgrade=False) + + def test_krb5_hdr_sign_delayed3_fail1(self): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + # + # SUPPORT_HEADER_SIGN on auth3 doesn't activate header signing + # + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context, + pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN, + use_auth3=True) + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + + self.assertFalse(auth_context['hdr_signing']) + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return if __name__ == "__main__": global_ndr_print = True -- cgit v1.2.1