from unittest import TestCase, TestSuite, makeSuite from wsgiref import util from wsgiref.tests import compare_generic_iter from StringIO import StringIO class UtilityTests(TestCase): def checkShift(self,sn_in,pi_in,part,sn_out,pi_out): env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in} util.setup_testing_defaults(env) self.assertEqual(util.shift_path_info(env),part) self.assertEqual(env['PATH_INFO'],pi_out) self.assertEqual(env['SCRIPT_NAME'],sn_out) return env def checkDefault(self, key, value, alt=None): # Check defaulting when empty env = {} util.setup_testing_defaults(env) if isinstance(value,StringIO): self.failUnless(isinstance(env[key],StringIO)) else: self.assertEqual(env[key],value) # Check existing value env = {key:alt} util.setup_testing_defaults(env) self.failUnless(env[key] is alt) def checkCrossDefault(self,key,value,**kw): util.setup_testing_defaults(kw) self.assertEqual(kw[key],value) def checkAppURI(self,uri,**kw): util.setup_testing_defaults(kw) self.assertEqual(util.application_uri(kw),uri) def checkReqURI(self,uri,query=1,**kw): util.setup_testing_defaults(kw) self.assertEqual(util.request_uri(kw,query),uri) def checkFW(self,text,size,match): def make_it(text=text,size=size): return util.FileWrapper(StringIO(text),size) compare_generic_iter(make_it,match) it = make_it() self.failIf(it.filelike.closed) for item in it: pass self.failIf(it.filelike.closed) it.close() self.failUnless(it.filelike.closed) def testSimpleShifts(self): self.checkShift('','/', '', '/', '') self.checkShift('','/x', 'x', '/x', '') self.checkShift('/','', None, '/', '') self.checkShift('/a','/x/y', 'x', '/a/x', '/y') self.checkShift('/a','/x/', 'x', '/a/x', '/') def testNormalizedShifts(self): self.checkShift('/a/b', '/../y', '..', '/a', '/y') self.checkShift('', '/../y', '..', '', '/y') self.checkShift('/a/b', '//y', 'y', '/a/b/y', '') self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/') self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '') self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/') self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/') self.checkShift('/a/b', '///', '', '/a/b/', '') self.checkShift('/a/b', '/.//', '', '/a/b/', '') self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/') self.checkShift('/a/b', '/.', None, '/a/b', '') def testDefaults(self): for key, value in [ ('SERVER_NAME','127.0.0.1'), ('SERVER_PORT', '80'), ('SERVER_PROTOCOL','HTTP/1.0'), ('HTTP_HOST','127.0.0.1'), ('REQUEST_METHOD','GET'), ('SCRIPT_NAME',''), ('PATH_INFO','/'), ('wsgi.version', (1,0)), ('wsgi.run_once', 0), ('wsgi.multithread', 0), ('wsgi.multiprocess', 0), ('wsgi.input', StringIO("")), ('wsgi.errors', StringIO()), ('wsgi.url_scheme','http'), ]: self.checkDefault(key,value) def testCrossDefaults(self): self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar") self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on") self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1") self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes") self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo") self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo") self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on") def testGuessScheme(self): self.assertEqual(util.guess_scheme({}), "http") self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http") self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https") self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https") self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https") def testAppURIs(self): self.checkAppURI("http://127.0.0.1/") self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") self.checkAppURI("http://spam.example.com:2071/", HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071") self.checkAppURI("http://spam.example.com/", SERVER_NAME="spam.example.com") self.checkAppURI("http://127.0.0.1/", HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com") self.checkAppURI("https://127.0.0.1/", HTTPS="on") self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000", HTTP_HOST=None) def testReqURIs(self): self.checkReqURI("http://127.0.0.1/") self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam") self.checkReqURI("http://127.0.0.1/spammity/spam", SCRIPT_NAME="/spammity", PATH_INFO="/spam") self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni", SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") self.checkReqURI("http://127.0.0.1/spammity/spam", 0, SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni") def testFileWrapper(self): self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10]) def testHopByHop(self): for hop in ( "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization " "TE Trailers Transfer-Encoding Upgrade" ).split(): for alt in hop, hop.title(), hop.upper(), hop.lower(): self.failUnless(util.is_hop_by_hop(alt)) # Not comprehensive, just a few random header names for hop in ( "Accept Cache-Control Date Pragma Trailer Via Warning" ).split(): for alt in hop, hop.title(), hop.upper(), hop.lower(): self.failIf(util.is_hop_by_hop(alt)) TestClasses = ( UtilityTests, ) def test_suite(): return TestSuite([makeSuite(t,'test') for t in TestClasses])