summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2016-09-11 23:25:49 +0200
committerAndreas Schneider <asn@cryptomilk.org>2016-10-26 11:20:20 +0200
commit9ef8bfabc6dabf1b240d4bb556f5bd68ea05d69d (patch)
treef65f9dcf848e3c994d5f06be863e8133a2489148 /python
parent3c474cd4890a37c22b69f716164e2c830ab76c41 (diff)
downloadsamba-9ef8bfabc6dabf1b240d4bb556f5bd68ea05d69d.tar.gz
python/tests: add simple dcerpc orphaned tests
ORPHANED is mostly ignored. It's up to the application server implementation to install a orphaned handler. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andreas Schneider <asn@samba.org>
Diffstat (limited to 'python')
-rwxr-xr-xpython/samba/tests/dcerpc/raw_protocol.py218
1 files changed, 218 insertions, 0 deletions
diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py
index 9220f06a56f..c49993b3871 100755
--- a/python/samba/tests/dcerpc/raw_protocol.py
+++ b/python/samba/tests/dcerpc/raw_protocol.py
@@ -2391,6 +2391,224 @@ class TestDCERPC_BIND(RawDCERPCTest):
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+ def test_orphaned_no_request(self):
+ ndr32 = base.transfer_syntax_ndr()
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ ctx = self.prepare_presentation(abstract, ndr32)
+
+ req = self.generate_orphaned(call_id = 3)
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.01)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ # And now try a request
+ req = self.generate_request(call_id = 1,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ def test_orphaned_request_after_first_last(self):
+ ndr32 = base.transfer_syntax_ndr()
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ ctx = self.prepare_presentation(abstract, ndr32)
+
+ req = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_orphaned(call_id = 1)
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ # And now try a request
+ req = self.generate_request(call_id = 2,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ def test_orphaned_request_after_first_mpx_last(self):
+ ndr32 = base.transfer_syntax_ndr()
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX
+ ctx = self.prepare_presentation(abstract, ndr32, pfc_flags=pfc_flags)
+
+ req = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_orphaned(call_id = 1)
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ # And now try a request
+ req = self.generate_request(call_id = 2,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
+
+ def test_orphaned_request_after_first_no_last(self):
+ ndr32 = base.transfer_syntax_ndr()
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+ ctx = self.prepare_presentation(abstract, ndr32)
+
+ req1 = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req1)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_orphaned(call_id = 1)
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ # And now try a new request
+ req2 = self.generate_request(call_id = 2,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req2)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req1.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, req1.u.context_id)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.flags, 0)
+ self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
+ self.assertEquals(rep.u.reserved, 0)
+ self.assertEquals(len(rep.u.error_and_verifier), 0)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
+ def test_orphaned_request_after_first_mpx_no_last(self):
+ ndr32 = base.transfer_syntax_ndr()
+ abstract = samba.dcerpc.mgmt.abstract_syntax()
+
+ pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
+ pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX
+ ctx = self.prepare_presentation(abstract, ndr32,
+ pfc_flags=pfc_flags)
+
+ req1 = self.generate_request(call_id = 1,
+ pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
+ context_id=ctx.context_id,
+ opnum=0,
+ stub="")
+ self.send_pdu(req1)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ req = self.generate_orphaned(call_id = 1)
+ self.send_pdu(req)
+ rep = self.recv_pdu(timeout=0.1)
+ self.assertIsNone(rep)
+ self.assertIsConnected()
+
+ # And now try a new request
+ req2 = self.generate_request(call_id = 2,
+ context_id=ctx.context_id-1,
+ opnum=0,
+ stub="")
+ self.send_pdu(req2)
+ rep = self.recv_pdu()
+ self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req2.call_id,
+ auth_length=0)
+ self.assertNotEquals(rep.u.alloc_hint, 0)
+ self.assertEquals(rep.u.context_id, 0)
+ self.assertEquals(rep.u.cancel_count, 0)
+ self.assertEquals(rep.u.flags, 0)
+ self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
+ self.assertEquals(rep.u.reserved, 0)
+ self.assertEquals(len(rep.u.error_and_verifier), 0)
+
+ # wait for a disconnect
+ rep = self.recv_pdu()
+ self.assertIsNone(rep)
+ self.assertNotConnected()
+
def test_spnego_connect_request(self):
ndr32 = base.transfer_syntax_ndr()