summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2018-11-21 11:49:40 +0100
committerJeremy Allison <jra@samba.org>2019-01-12 03:13:40 +0100
commit5466ed1232ee5e13335a79bf04e6205f901f6e21 (patch)
tree76c90e0aa385e7d787730f19b072ede1c9c43f7b /python
parentd47f9af7cbb09ed015abcfc6ea7d19ec443bdcf2 (diff)
downloadsamba-5466ed1232ee5e13335a79bf04e6205f901f6e21.tar.gz
py:dcerpc/raw_protocol: add tests for delayed header signing activation
BUG: https://bugzilla.samba.org/show_bug.cgi?id=7113 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11892 Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-xpython/samba/tests/dcerpc/raw_protocol.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py
index aab6d86253f..384b7d47bca 100755
--- a/python/samba/tests/dcerpc/raw_protocol.py
+++ b/python/samba/tests/dcerpc/raw_protocol.py
@@ -4988,6 +4988,192 @@ class TestDCERPC_BIND(RawDCERPCTest):
conn2._disconnect("End of Test")
return
+ def _test_krb5_hdr_sign_delayed1(self, do_upgrade):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+ auth_context_id = 1
+
+ creds = self.get_user_creds()
+
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ transfer = base.transfer_syntax_ndr()
+
+ tsf1_list = [transfer]
+ ctx = samba.dcerpc.dcerpc.ctx_list()
+ ctx.context_id = 1
+ ctx.num_transfer_syntaxes = len(tsf1_list)
+ ctx.abstract_syntax = abstract
+ ctx.transfer_syntaxes = tsf1_list
+
+ auth_context = self.get_auth_context_creds(creds=creds,
+ auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ hdr_signing=False)
+
+ ack = self.do_generic_bind(call_id=1,
+ ctx=ctx,
+ auth_context=auth_context)
+
+ inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+ self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context)
+
+ #
+ # This is just an alter context without authentication
+ # But it can turn on header signing for the whole connection
+ #
+ ack2 = self.do_generic_bind(call_id=3, ctx=ctx,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST |
+ dcerpc.DCERPC_PFC_FLAG_LAST |
+ dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN,
+ assoc_group_id = ack.u.assoc_group_id,
+ start_with_alter=True)
+
+ self.assertFalse(auth_context['hdr_signing'])
+ if do_upgrade:
+ auth_context['hdr_signing'] = True
+ auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+ fault_status=None
+ else:
+ fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+ self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context,
+ fault_status=fault_status)
+
+ if fault_status is not None:
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+
+ self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context)
+ return
+
+ def test_krb5_hdr_sign_delayed1_ok1(self):
+ return self._test_krb5_hdr_sign_delayed1(do_upgrade=True)
+
+ def test_krb5_hdr_sign_delayed1_fail1(self):
+ return self._test_krb5_hdr_sign_delayed1(do_upgrade=False)
+
+ def _test_krb5_hdr_sign_delayed2(self, do_upgrade):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+ auth_context_id = 1
+
+ creds = self.get_user_creds()
+
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ transfer = base.transfer_syntax_ndr()
+
+ tsf1_list = [transfer]
+ ctx = samba.dcerpc.dcerpc.ctx_list()
+ ctx.context_id = 1
+ ctx.num_transfer_syntaxes = len(tsf1_list)
+ ctx.abstract_syntax = abstract
+ ctx.transfer_syntaxes = tsf1_list
+
+ auth_context = self.get_auth_context_creds(creds=creds,
+ auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ hdr_signing=False)
+
+ #
+ # SUPPORT_HEADER_SIGN on alter context activates header signing
+ #
+ ack = self.do_generic_bind(call_id=1,
+ ctx=ctx,
+ auth_context=auth_context,
+ pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST |
+ dcerpc.DCERPC_PFC_FLAG_LAST |
+ dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN)
+
+ self.assertFalse(auth_context['hdr_signing'])
+ if do_upgrade:
+ auth_context['hdr_signing'] = True
+ auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+ fault_status=None
+ else:
+ fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+ inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+ self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context,
+ fault_status=fault_status)
+
+ if fault_status is not None:
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
+
+ self.do_single_request(call_id=5, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context)
+ return
+
+ def test_krb5_hdr_sign_delayed2_ok1(self):
+ return self._test_krb5_hdr_sign_delayed2(do_upgrade=True)
+
+ def test_krb5_hdr_sign_delayed2_fail1(self):
+ return self._test_krb5_hdr_sign_delayed2(do_upgrade=False)
+
+ def test_krb5_hdr_sign_delayed3_fail1(self):
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_KRB5
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_INTEGRITY
+ auth_context_id = 1
+
+ creds = self.get_user_creds()
+
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ transfer = base.transfer_syntax_ndr()
+
+ tsf1_list = [transfer]
+ ctx = samba.dcerpc.dcerpc.ctx_list()
+ ctx.context_id = 1
+ ctx.num_transfer_syntaxes = len(tsf1_list)
+ ctx.abstract_syntax = abstract
+ ctx.transfer_syntaxes = tsf1_list
+
+ auth_context = self.get_auth_context_creds(creds=creds,
+ auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ hdr_signing=False)
+
+ #
+ # SUPPORT_HEADER_SIGN on auth3 doesn't activate header signing
+ #
+ ack = self.do_generic_bind(call_id=1,
+ ctx=ctx,
+ auth_context=auth_context,
+ pfc_flags_2nd=dcerpc.DCERPC_PFC_FLAG_FIRST |
+ dcerpc.DCERPC_PFC_FLAG_LAST |
+ dcerpc.DCERPC_PFC_FLAG_SUPPORT_HEADER_SIGN,
+ use_auth3=True)
+
+ inq_if_ids = samba.dcerpc.mgmt.inq_if_ids()
+ self.do_single_request(call_id=2, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context)
+
+ self.assertFalse(auth_context['hdr_signing'])
+ auth_context['hdr_signing'] = True
+ auth_context["gensec"].want_feature(gensec.FEATURE_SIGN_PKT_HEADER)
+ fault_status=dcerpc.DCERPC_FAULT_SEC_PKG_ERROR
+
+ self.do_single_request(call_id=4, ctx=ctx, io=inq_if_ids,
+ auth_context=auth_context,
+ fault_status=fault_status)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+ return
if __name__ == "__main__":
global_ndr_print = True