diff options
Diffstat (limited to 'Lib/test/test_xmlrpc.py')
| -rw-r--r-- | Lib/test/test_xmlrpc.py | 373 | 
1 files changed, 329 insertions, 44 deletions
| diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 9833f2674f..102f892d47 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -5,13 +5,19 @@ import time  import unittest  import xmlrpc.client as xmlrpclib  import xmlrpc.server -import threading  import http.client  import socket  import os  import re +import io +import contextlib  from test import support +try: +    import threading +except ImportError: +    threading = None +  alist = [{'astring': 'foo@bar.baz.spam',            'afloat': 7283.43,            'anint': 2**20, @@ -73,11 +79,11 @@ class XMLRPCTestCase(unittest.TestCase):          d = xmlrpclib.DateTime()          ((new_d,), dummy) = xmlrpclib.loads(xmlrpclib.dumps((d,),                                              methodresponse=True)) -        self.assertTrue(isinstance(new_d.value, str)) +        self.assertIsInstance(new_d.value, str)          # Check that the output of dumps() is still an 8-bit string          s = xmlrpclib.dumps((new_d,), methodresponse=True) -        self.assertTrue(isinstance(s, str)) +        self.assertIsInstance(s, str)      def test_newstyle_class(self):          class T(object): @@ -259,7 +265,7 @@ ADDR = PORT = URL = None  # The evt is set twice.  First when the server is ready to serve.  # Second when the server has been shutdown.  The user must clear  # the event after it has been set the first time to catch the second set. -def http_server(evt, numrequests): +def http_server(evt, numrequests, requestHandler=None):      class TestInstanceClass:          def div(self, x, y):              return x // y @@ -280,7 +286,9 @@ def http_server(evt, numrequests):              s.setblocking(True)              return s, port -    serv = MyXMLRPCServer(("localhost", 0), +    if not requestHandler: +        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler +    serv = MyXMLRPCServer(("localhost", 0), requestHandler,                            logRequests=False, bind_and_activate=False)      try:          serv.server_bind() @@ -312,6 +320,66 @@ def http_server(evt, numrequests):          PORT = None          evt.set() +def http_multi_server(evt, numrequests, requestHandler=None): +    class TestInstanceClass: +        def div(self, x, y): +            return x // y + +        def _methodHelp(self, name): +            if name == 'div': +                return 'This is the div function' + +    def my_function(): +        '''This is my function''' +        return True + +    class MyXMLRPCServer(xmlrpc.server.MultiPathXMLRPCServer): +        def get_request(self): +            # Ensure the socket is always non-blocking.  On Linux, socket +            # attributes are not inherited like they are on *BSD and Windows. +            s, port = self.socket.accept() +            s.setblocking(True) +            return s, port + +    if not requestHandler: +        requestHandler = xmlrpc.server.SimpleXMLRPCRequestHandler +    class MyRequestHandler(requestHandler): +        rpc_paths = [] + +    serv = MyXMLRPCServer(("localhost", 0), MyRequestHandler, +                          logRequests=False, bind_and_activate=False) +    serv.socket.settimeout(3) +    serv.server_bind() +    try: +        global ADDR, PORT, URL +        ADDR, PORT = serv.socket.getsockname() +        #connect to IP address directly.  This avoids socket.create_connection() +        #trying to connect to to "localhost" using all address families, which +        #causes slowdown e.g. on vista which supports AF_INET6.  The server listens +        #on AF_INET only. +        URL = "http://%s:%d"%(ADDR, PORT) +        serv.server_activate() +        paths = ["/foo", "/foo/bar"] +        for path in paths: +            d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) +            d.register_introspection_functions() +            d.register_multicall_functions() +        serv.get_dispatcher(paths[0]).register_function(pow) +        serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') +        evt.set() + +        # handle up to 'numrequests' requests +        while numrequests > 0: +            serv.handle_request() +            numrequests -= 1 + +    except socket.timeout: +        pass +    finally: +        serv.socket.close() +        PORT = None +        evt.set() +  # This function prevents errors like:  #    <ProtocolError for localhost:57527/RPC2: 500 Internal Server Error>  def is_unavailable_exception(e): @@ -331,15 +399,36 @@ def is_unavailable_exception(e):      if exc_mess and 'temporarily unavailable' in exc_mess.lower():          return True -class SimpleServerTestCase(unittest.TestCase): +def make_request_and_skipIf(condition, reason): +    # If we skip the test, we have to make a request because the +    # the server created in setUp blocks expecting one to come in. +    if not condition: +        return lambda func: func +    def decorator(func): +        def make_request_and_skip(self): +            try: +                xmlrpclib.ServerProxy(URL).my_function() +            except (xmlrpclib.ProtocolError, socket.error) as e: +                if not is_unavailable_exception(e): +                    raise +            raise unittest.SkipTest(reason) +        return make_request_and_skip +    return decorator + +@unittest.skipUnless(threading, 'Threading required for this test.') +class BaseServerTestCase(unittest.TestCase): +    requestHandler = None +    request_count = 1 +    threadFunc = staticmethod(http_server) +      def setUp(self):          # enable traceback reporting          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True          self.evt = threading.Event()          # start server thread to handle requests -        serv_args = (self.evt, 1) -        threading.Thread(target=http_server, args=serv_args).start() +        serv_args = (self.evt, self.request_count, self.requestHandler) +        threading.Thread(target=self.threadFunc, args=serv_args).start()          # wait for the server to be ready          self.evt.wait() @@ -348,6 +437,7 @@ class SimpleServerTestCase(unittest.TestCase):      def tearDown(self):          # wait on the server thread to terminate          self.evt.wait(4.0) +        # XXX this code does not work, and in fact stop_serving doesn't exist.          if not self.evt.is_set():              self.evt.set()              stop_serving() @@ -356,6 +446,7 @@ class SimpleServerTestCase(unittest.TestCase):          # disable traceback reporting          xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False +class SimpleServerTestCase(BaseServerTestCase):      def test_simple1(self):          try:              p = xmlrpclib.ServerProxy(URL) @@ -418,6 +509,8 @@ class SimpleServerTestCase(unittest.TestCase):                  # protocol error; provide additional information in test output                  self.fail("%s\n%s" % (e, getattr(e, "headers", ""))) +    @make_request_and_skipIf(sys.flags.optimize >= 2, +                     "Docstrings are omitted with -O2 and above")      def test_introspection3(self):          try:              # test native doc @@ -491,6 +584,184 @@ class SimpleServerTestCase(unittest.TestCase):          # This avoids waiting for the socket timeout.          self.test_simple1() +class MultiPathServerTestCase(BaseServerTestCase): +    threadFunc = staticmethod(http_multi_server) +    request_count = 2 +    def test_path1(self): +        p = xmlrpclib.ServerProxy(URL+"/foo") +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) +    def test_path2(self): +        p = xmlrpclib.ServerProxy(URL+"/foo/bar") +        self.assertEqual(p.add(6,8), 6+8) +        self.assertRaises(xmlrpclib.Fault, p.pow, 6, 8) + +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class BaseKeepaliveServerTestCase(BaseServerTestCase): +    #a request handler that supports keep-alive and logs requests into a +    #class variable +    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): +        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler +        protocol_version = 'HTTP/1.1' +        myRequests = [] +        def handle(self): +            self.myRequests.append([]) +            self.reqidx = len(self.myRequests)-1 +            return self.parentClass.handle(self) +        def handle_one_request(self): +            result = self.parentClass.handle_one_request(self) +            self.myRequests[self.reqidx].append(self.raw_requestline) +            return result + +    requestHandler = RequestHandler +    def setUp(self): +        #clear request log +        self.RequestHandler.myRequests = [] +        return BaseServerTestCase.setUp(self) + +#A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism +#does indeed serve subsequent requests on the same connection +class KeepaliveServerTestCase1(BaseKeepaliveServerTestCase): +    def test_two(self): +        p = xmlrpclib.ServerProxy(URL) +        #do three requests. +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        p("close")() + +        #they should have all been handled by a single request handler +        self.assertEqual(len(self.RequestHandler.myRequests), 1) + +        #check that we did at least two (the third may be pending append +        #due to thread scheduling) +        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) + + +#test special attribute access on the serverproxy, through the __call__ +#function. +class KeepaliveServerTestCase2(BaseKeepaliveServerTestCase): +    #ask for two keepalive requests to be handled. +    request_count=2 + +    def test_close(self): +        p = xmlrpclib.ServerProxy(URL) +        #do some requests with close. +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        p("close")() #this should trigger a new keep-alive request +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        self.assertEqual(p.pow(6,8), 6**8) +        p("close")() + +        #they should have all been two request handlers, each having logged at least +        #two complete requests +        self.assertEqual(len(self.RequestHandler.myRequests), 2) +        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-1]), 2) +        self.assertGreaterEqual(len(self.RequestHandler.myRequests[-2]), 2) + + +    def test_transport(self): +        p = xmlrpclib.ServerProxy(URL) +        #do some requests with close. +        self.assertEqual(p.pow(6,8), 6**8) +        p("transport").close() #same as above, really. +        self.assertEqual(p.pow(6,8), 6**8) +        p("close")() +        self.assertEqual(len(self.RequestHandler.myRequests), 2) + +#A test case that verifies that gzip encoding works in both directions +#(for a request and the response) +class GzipServerTestCase(BaseServerTestCase): +    #a request handler that supports keep-alive and logs requests into a +    #class variable +    class RequestHandler(xmlrpc.server.SimpleXMLRPCRequestHandler): +        parentClass = xmlrpc.server.SimpleXMLRPCRequestHandler +        protocol_version = 'HTTP/1.1' + +        def do_POST(self): +            #store content of last request in class +            self.__class__.content_length = int(self.headers["content-length"]) +            return self.parentClass.do_POST(self) +    requestHandler = RequestHandler + +    class Transport(xmlrpclib.Transport): +        #custom transport, stores the response length for our perusal +        fake_gzip = False +        def parse_response(self, response): +            self.response_length=int(response.getheader("content-length", 0)) +            return xmlrpclib.Transport.parse_response(self, response) + +        def send_content(self, connection, body): +            if self.fake_gzip: +                #add a lone gzip header to induce decode error remotely +                connection.putheader("Content-Encoding", "gzip") +            return xmlrpclib.Transport.send_content(self, connection, body) + +    def setUp(self): +        BaseServerTestCase.setUp(self) + +    def test_gzip_request(self): +        t = self.Transport() +        t.encode_threshold = None +        p = xmlrpclib.ServerProxy(URL, transport=t) +        self.assertEqual(p.pow(6,8), 6**8) +        a = self.RequestHandler.content_length +        t.encode_threshold = 0 #turn on request encoding +        self.assertEqual(p.pow(6,8), 6**8) +        b = self.RequestHandler.content_length +        self.assertTrue(a>b) +        p("close")() + +    def test_bad_gzip_request(self): +        t = self.Transport() +        t.encode_threshold = None +        t.fake_gzip = True +        p = xmlrpclib.ServerProxy(URL, transport=t) +        cm = self.assertRaisesRegex(xmlrpclib.ProtocolError, +                                    re.compile(r"\b400\b")) +        with cm: +            p.pow(6, 8) +        p("close")() + +    def test_gsip_response(self): +        t = self.Transport() +        p = xmlrpclib.ServerProxy(URL, transport=t) +        old = self.requestHandler.encode_threshold +        self.requestHandler.encode_threshold = None #no encoding +        self.assertEqual(p.pow(6,8), 6**8) +        a = t.response_length +        self.requestHandler.encode_threshold = 0 #always encode +        self.assertEqual(p.pow(6,8), 6**8) +        p("close")() +        b = t.response_length +        self.requestHandler.encode_threshold = old +        self.assertTrue(a>b) + +#Test special attributes of the ServerProxy object +class ServerProxyTestCase(unittest.TestCase): +    def setUp(self): +        unittest.TestCase.setUp(self) +        if threading: +            self.url = URL +        else: +            # Without threading, http_server() and http_multi_server() will not +            # be executed and URL is still equal to None. 'http://' is a just +            # enough to choose the scheme (HTTP) +            self.url = 'http://' + +    def test_close(self): +        p = xmlrpclib.ServerProxy(self.url) +        self.assertEqual(p('close')(), None) + +    def test_transport(self): +        t = xmlrpclib.Transport() +        p = xmlrpclib.ServerProxy(self.url, transport=t) +        self.assertEqual(p('transport'), t) +  # This is a contrived way to make a failure occur on the server side  # in order to test the _send_traceback_header flag on the server  class FailingMessageClass(http.client.HTTPMessage): @@ -501,6 +772,7 @@ class FailingMessageClass(http.client.HTTPMessage):          return super().get(key, failobj) +@unittest.skipUnless(threading, 'Threading required for this test.')  class FailingServerTestCase(unittest.TestCase):      def setUp(self):          self.evt = threading.Event() @@ -576,6 +848,21 @@ class FailingServerTestCase(unittest.TestCase):          else:              self.fail('ProtocolError not raised') + +@contextlib.contextmanager +def captured_stdout(encoding='utf-8'): +    """A variation on support.captured_stdout() which gives a text stream +    having a `buffer` attribute. +    """ +    import io +    orig_stdout = sys.stdout +    sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding=encoding) +    try: +        yield sys.stdout +    finally: +        sys.stdout = orig_stdout + +  class CGIHandlerTestCase(unittest.TestCase):      def setUp(self):          self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler() @@ -588,54 +875,45 @@ class CGIHandlerTestCase(unittest.TestCase):              env['REQUEST_METHOD'] = 'GET'              # if the method is GET and no request_text is given, it runs handle_get              # get sysout output -            tmp = sys.stdout -            sys.stdout = open(support.TESTFN, "w") -            self.cgi.handle_request() -            sys.stdout.close() -            sys.stdout = tmp +            with captured_stdout(encoding=self.cgi.encoding) as data_out: +                self.cgi.handle_request()              # parse Status header -            handle = open(support.TESTFN, "r").read() +            data_out.seek(0) +            handle = data_out.read()              status = handle.split()[1]              message = ' '.join(handle.split()[2:4])              self.assertEqual(status, '400')              self.assertEqual(message, 'Bad Request') -            os.remove(support.TESTFN)      def test_cgi_xmlrpc_response(self):          data = """<?xml version='1.0'?> -<methodCall> -    <methodName>test_method</methodName> -    <params> -        <param> -            <value><string>foo</string></value> -        </param> -        <param> -            <value><string>bar</string></value> -        </param> -     </params> -</methodCall> -""" -        open("xmldata.txt", "w").write(data) -        tmp1 = sys.stdin -        tmp2 = sys.stdout - -        sys.stdin = open("xmldata.txt", "r") -        sys.stdout = open(support.TESTFN, "w") - -        with support.EnvironmentVarGuard() as env: +        <methodCall> +            <methodName>test_method</methodName> +            <params> +                <param> +                    <value><string>foo</string></value> +                </param> +                <param> +                    <value><string>bar</string></value> +                </param> +            </params> +        </methodCall> +        """ + +        with support.EnvironmentVarGuard() as env, \ +             captured_stdout(encoding=self.cgi.encoding) as data_out, \ +             support.captured_stdin() as data_in: +            data_in.write(data) +            data_in.seek(0)              env['CONTENT_LENGTH'] = str(len(data))              self.cgi.handle_request() - -        sys.stdin.close() -        sys.stdout.close() -        sys.stdin = tmp1 -        sys.stdout = tmp2 +        data_out.seek(0)          # will respond exception, if so, our goal is achieved ;) -        handle = open(support.TESTFN, "r").read() +        handle = data_out.read()          # start with 44th char so as not to get http header, we just          # need only xml @@ -652,14 +930,21 @@ class CGIHandlerTestCase(unittest.TestCase):              int(re.search('Content-Length: (\d+)', handle).group(1)),              len(content)) -        os.remove("xmldata.txt") -        os.remove(support.TESTFN) - +@support.reap_threads  def test_main():      xmlrpc_tests = [XMLRPCTestCase, HelperTestCase, DateTimeTestCase,           BinaryTestCase, FaultTestCase]      xmlrpc_tests.append(SimpleServerTestCase) +    xmlrpc_tests.append(KeepaliveServerTestCase1) +    xmlrpc_tests.append(KeepaliveServerTestCase2) +    try: +        import gzip +        xmlrpc_tests.append(GzipServerTestCase) +    except ImportError: +        pass #gzip not supported in this build +    xmlrpc_tests.append(MultiPathServerTestCase) +    xmlrpc_tests.append(ServerProxyTestCase)      xmlrpc_tests.append(FailingServerTestCase)      xmlrpc_tests.append(CGIHandlerTestCase) | 
