diff options
author | Stefan Metzmacher <metze@samba.org> | 2016-09-11 23:25:49 +0200 |
---|---|---|
committer | Andreas Schneider <asn@cryptomilk.org> | 2016-10-26 11:20:20 +0200 |
commit | 9ef8bfabc6dabf1b240d4bb556f5bd68ea05d69d (patch) | |
tree | f65f9dcf848e3c994d5f06be863e8133a2489148 /python | |
parent | 3c474cd4890a37c22b69f716164e2c830ab76c41 (diff) | |
download | samba-9ef8bfabc6dabf1b240d4bb556f5bd68ea05d69d.tar.gz |
python/tests: add simple dcerpc orphaned tests
ORPHANED is mostly ignored. It's up to the application server
implementation to install a orphaned handler.
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-x | python/samba/tests/dcerpc/raw_protocol.py | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index 9220f06a56f..c49993b3871 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -2391,6 +2391,224 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertEquals(rep.u.cancel_count, 0) self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + def test_orphaned_no_request(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + ctx = self.prepare_presentation(abstract, ndr32) + + req = self.generate_orphaned(call_id = 3) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.01) + self.assertIsNone(rep) + self.assertIsConnected() + + # And now try a request + req = self.generate_request(call_id = 1, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + def test_orphaned_request_after_first_last(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + ctx = self.prepare_presentation(abstract, ndr32) + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_orphaned(call_id = 1) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + # And now try a request + req = self.generate_request(call_id = 2, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + def test_orphaned_request_after_first_mpx_last(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX + ctx = self.prepare_presentation(abstract, ndr32, pfc_flags=pfc_flags) + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_orphaned(call_id = 1) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + # And now try a request + req = self.generate_request(call_id = 2, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id & 0xff) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + def test_orphaned_request_after_first_no_last(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + ctx = self.prepare_presentation(abstract, ndr32) + + req1 = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req1) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_orphaned(call_id = 1) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + # And now try a new request + req2 = self.generate_request(call_id = 2, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req2) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req1.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req1.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_orphaned_request_after_first_mpx_no_last(self): + ndr32 = base.transfer_syntax_ndr() + abstract = samba.dcerpc.mgmt.abstract_syntax() + + pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST + pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX + ctx = self.prepare_presentation(abstract, ndr32, + pfc_flags=pfc_flags) + + req1 = self.generate_request(call_id = 1, + pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST, + context_id=ctx.context_id, + opnum=0, + stub="") + self.send_pdu(req1) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + req = self.generate_orphaned(call_id = 1) + self.send_pdu(req) + rep = self.recv_pdu(timeout=0.1) + self.assertIsNone(rep) + self.assertIsConnected() + + # And now try a new request + req2 = self.generate_request(call_id = 2, + context_id=ctx.context_id-1, + opnum=0, + stub="") + self.send_pdu(req2) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req2.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, 0) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + def test_spnego_connect_request(self): ndr32 = base.transfer_syntax_ndr() |