diff options
author | Stefan Metzmacher <metze@samba.org> | 2018-11-21 11:49:40 +0100 |
---|---|---|
committer | Jeremy Allison <jra@samba.org> | 2019-01-12 03:13:40 +0100 |
commit | 5466ed1232ee5e13335a79bf04e6205f901f6e21 (patch) | |
tree | 76c90e0aa385e7d787730f19b072ede1c9c43f7b /python | |
parent | d47f9af7cbb09ed015abcfc6ea7d19ec443bdcf2 (diff) | |
download | samba-5466ed1232ee5e13335a79bf04e6205f901f6e21.tar.gz |
py:dcerpc/raw_protocol: add tests for delayed header signing activation
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-x | python/samba/tests/dcerpc/raw_protocol.py | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index aab6d86253f..384b7d47bca 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -4988,6 +4988,192 @@ class TestDCERPC_BIND(RawDCERPCTest): conn2._disconnect("End of Test") return + def _test_krb5_hdr_sign_delayed1(self, do_upgrade): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context) + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + + # + # This is just an alter context without authentication + # But it can turn on header signing for the whole connection + # + ack2 = self.do_generic_bind(call_id=3, ctx=ctx, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN, + assoc_group_id = ack.u.assoc_group_id, + start_with_alter=True) + + self.assertFalse(auth_context['hdr_signing']) + if do_upgrade: + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=None + else: + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + if fault_status is not None: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + + self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + return + + def test_krb5_hdr_sign_delayed1_ok1(self): + return self._test_krb5_hdr_sign_delayed1(do_upgrade=True) + + def test_krb5_hdr_sign_delayed1_fail1(self): + return self._test_krb5_hdr_sign_delayed1(do_upgrade=False) + + def _test_krb5_hdr_sign_delayed2(self, do_upgrade): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + # + # SUPPORT_HEADER_SIGN on alter context activates header signing + # + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context, + pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN) + + self.assertFalse(auth_context['hdr_signing']) + if do_upgrade: + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=None + else: + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + if fault_status is not None: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + + self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + return + + def test_krb5_hdr_sign_delayed2_ok1(self): + return self._test_krb5_hdr_sign_delayed2(do_upgrade=True) + + def test_krb5_hdr_sign_delayed2_fail1(self): + return self._test_krb5_hdr_sign_delayed2(do_upgrade=False) + + def test_krb5_hdr_sign_delayed3_fail1(self): + auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5 + auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY + auth_context_id = 1 + + creds = self.get_user_creds() + + abstract = samba.dcerpc.mgmt.abstract_syntax() + transfer = base.transfer_syntax_ndr() + + tsf1_list = [transfer] + ctx = samba.dcerpc.dcerpc.ctx_list() + ctx.context_id = 1 + ctx.num_transfer_syntaxes = len(tsf1_list) + ctx.abstract_syntax = abstract + ctx.transfer_syntaxes = tsf1_list + + auth_context = self.get_auth_context_creds(creds=creds, + auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + hdr_signing=False) + + # + # SUPPORT_HEADER_SIGN on auth3 doesn't activate header signing + # + ack = self.do_generic_bind(call_id=1, + ctx=ctx, + auth_context=auth_context, + pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST | + dcerpc.DCERPC_PFC_FLAG_LAST | + dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN, + use_auth3=True) + + inq_if_ids = samba.dcerpc.mgmt.inq_if_ids() + self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids, + auth_context=auth_context) + + self.assertFalse(auth_context['hdr_signing']) + auth_context['hdr_signing'] = True + auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER) + fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR + + self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids, + auth_context=auth_context, + fault_status=fault_status) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return if __name__ == "__main__": global_ndr_print = True |