summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2016-06-23 12:06:40 +0200
committerKarolin Seeger <kseeger@samba.org>2016-08-04 13:43:46 +0200
commit67859d0f316aaf62ef662d9b9ff80381a07bfed8 (patch)
tree3e1dd4edcea47b2be6581554509480e6ebf98542
parent00b06bccde173d8b9613d34c1f3e9c4e4788ca12 (diff)
downloadsamba-67859d0f316aaf62ef662d9b9ff80381a07bfed8.tar.gz
python/tests: add auth_pad test for the dcerpc raw_protocol test
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11982 Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org> Autobuild-User(master): Stefan Metzmacher <metze@samba.org> Autobuild-Date(master): Fri Jun 24 18:08:44 CEST 2016 on sn-devel-144 (cherry picked from commit c49f9abb19adca999d0b1d897d00d91f0ad91bbd) Autobuild-User(v4-2-test): Karolin Seeger <kseeger@samba.org> Autobuild-Date(v4-2-test): Thu Aug 4 13:43:46 CEST 2016 on sn-devel-104
-rwxr-xr-xpython/samba/tests/dcerpc/raw_protocol.py548
1 files changed, 548 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py
index ccd0f6bd210..8b0bc4eb8a3 100755
--- a/python/samba/tests/dcerpc/raw_protocol.py
+++ b/python/samba/tests/dcerpc/raw_protocol.py
@@ -2616,6 +2616,554 @@ class TestDCERPC_BIND(RawDCERPCTest):
self.assertIsNone(rep)
self.assertNotConnected()
+ def test_spnego_auth_pad_ok(self):
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ c = Credentials()
+ c.set_anonymous()
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(c)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_SPNEGO
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = ""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ #self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEquals(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+ req = self.generate_alter(call_id=0,
+ ctx_list=ctx_list,
+ assoc_group_id=rep.u.assoc_group_id,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+ req = self.generate_alter(call_id=0,
+ ctx_list=ctx_list,
+ assoc_group_id=rep.u.assoc_group_id,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 0)
+ self.assertEquals(len(rep.u._pad1), 2)
+ # Windows sends garbage
+ #self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEquals(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+ self.assertTrue(finished)
+
+ # And now try a request without auth_info
+ req = self.generate_request(call_id = 2,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob="\x01"+"\x00"*15)
+ req = self.generate_request(call_id = 3,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub="",
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ # We don't get an auth_info back
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ self._disconnect("disconnect")
+ self.assertNotConnected()
+
+ def test_spnego_auth_pad_fail_bind(self):
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ c = Credentials()
+ c.set_anonymous()
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(c)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_SPNEGO
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = ""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+ auth_pad_bad = auth_pad_ok + 1
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_bad,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
+ auth_length=0)
+ self.assertEquals(rep.u.reject_reason,
+ dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED)
+ self.assertEquals(rep.u.num_versions, 1)
+ self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
+ self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
+ self.assertEquals(len(rep.u._pad), 3)
+ self.assertEquals(rep.u._pad, '\0' * 3)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
+ def test_spnego_auth_pad_fail_alter(self):
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ c = Credentials()
+ c.set_anonymous()
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(c)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_SPNEGO
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = ""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ #self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEquals(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+ req = self.generate_alter(call_id=0,
+ ctx_list=ctx_list,
+ assoc_group_id=rep.u.assoc_group_id,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+ auth_pad_bad = auth_pad_ok + 1
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_bad,
+ auth_blob=to_server)
+ req = self.generate_alter(call_id=0,
+ ctx_list=ctx_list,
+ assoc_group_id=rep.u.assoc_group_id,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags |
+ dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, 0)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
+ self.assertEquals(len(rep.u._pad), 4)
+ self.assertEquals(rep.u._pad, '\0' * 4)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
+ def test_ntlmssp_auth_pad_ok(self):
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ c = Credentials()
+ c.set_anonymous()
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(c)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = ""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ #self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEquals(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+ self.assertTrue(finished)
+
+ auth_pad_ok = 0
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+ req = self.generate_auth3(call_id=0,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ self.assertIsConnected()
+
+ # And now try a request without auth_info
+ req = self.generate_request(call_id = 2,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob="\x01"+"\x00"*15)
+ req = self.generate_request(call_id = 3,
+ context_id=ctx1.context_id,
+ opnum=0,
+ stub="",
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ # We don't get an auth_info back
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ self._disconnect("disconnect")
+ self.assertNotConnected()
+
+ def test_ntlmssp_auth_pad_fail_auth3(self):
+ ndr32 = base.transfer_syntax_ndr()
+
+ tsf1_list = [ndr32]
+ ctx1 = dcerpc.ctx_list()
+ ctx1.context_id = 1
+ ctx1.num_transfer_syntaxes = len(tsf1_list)
+ ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax()
+ ctx1.transfer_syntaxes = tsf1_list
+ ctx_list = [ctx1]
+
+ c = Credentials()
+ c.set_anonymous()
+ g = gensec.Security.start_client(self.settings)
+ g.set_credentials(c)
+ g.want_feature(gensec.FEATURE_DCE_STYLE)
+ auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP
+ auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT
+ auth_context_id = 2
+ g.start_mech_by_authtype(auth_type, auth_level)
+ from_server = ""
+ (finished, to_server) = g.update(from_server)
+ self.assertFalse(finished)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ req_pdu = samba.ndr.ndr_pack(req)
+
+ auth_pad_ok = len(req_pdu)
+ auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH
+ auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH
+ auth_pad_ok -= len(to_server)
+
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_ok,
+ auth_blob=to_server)
+
+ req = self.generate_bind(call_id=0,
+ ctx_list=ctx_list,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id)
+ self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
+ self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
+ self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
+ self.assertEquals(rep.u.secondary_address_size, 4)
+ self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
+ self.assertEquals(len(rep.u._pad1), 2)
+ #self.assertEquals(rep.u._pad1, '\0' * 2)
+ self.assertEquals(rep.u.num_results, 1)
+ self.assertEquals(rep.u.ctx_list[0].result,
+ dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE)
+ self.assertEquals(rep.u.ctx_list[0].reason,
+ dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED)
+ self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32)
+ self.assertNotEquals(len(rep.u.auth_info), 0)
+ a = self.parse_auth(rep.u.auth_info)
+
+ from_server = a.credentials
+ (finished, to_server) = g.update(from_server)
+ self.assertTrue(finished)
+
+ auth_pad_bad = 1
+ auth_info = self.generate_auth(auth_type=auth_type,
+ auth_level=auth_level,
+ auth_context_id=auth_context_id,
+ auth_pad_length=auth_pad_bad,
+ auth_blob=to_server)
+ req = self.generate_auth3(call_id=0,
+ auth_info=auth_info)
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id,
+ pfc_flags=req.pfc_flags |
+ dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, 0)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_FAULT_REMOTE_NO_MEMORY)
+ self.assertEquals(len(rep.u._pad), 4)
+ self.assertEquals(rep.u._pad, '\0' * 4)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
if __name__ == "__main__":
global_ndr_print = True
global_hexdump = True