import thread import time import BaseHTTPServer from rdflib import Graph """ Test that correct content negoation headers are passed by graph.parse """ xmltestdoc=""" """ n3testdoc="""@prefix : . :a :b :c . """ nttestdoc=" .\n" class TestHTTPHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_GET(self): self.send_response(200, "OK") # fun fun fun parsing accept header. acs=self.headers["Accept"].split(",") acq=[x.split(";") for x in acs if ";" in x] acn=[(x,"q=1") for x in acs if ";" not in x] acs=[(x[0], float(x[1].strip()[2:])) for x in acq+acn] ac=sorted(acs, key=lambda x: x[1]) ct=ac[-1] if "application/rdf+xml" in ct: rct="application/rdf+xml" content=xmltestdoc elif "text/n3" in ct: rct="text/n3" content=n3testdoc elif "text/plain" in ct: rct="text/plain" content=nttestdoc self.send_header("Content-type",rct) self.end_headers() self.wfile.write(content.encode('utf-8')) def log_message(self, *args): pass def runHttpServer(server_class=BaseHTTPServer.HTTPServer, handler_class=TestHTTPHandler): """Start a server than can handle 3 requests :)""" server_address = ('localhost', 12345) httpd = server_class(server_address, handler_class) httpd.handle_request() httpd.handle_request() httpd.handle_request() def testConNeg(): thread.start_new_thread(runHttpServer, tuple()) # hang on a second while server starts time.sleep(1) graph=Graph() graph.parse("http://localhost:12345/foo", format="xml") graph.parse("http://localhost:12345/foo", format="n3") graph.parse("http://localhost:12345/foo", format="nt") if __name__ == "__main__": import sys import nose if len(sys.argv)==1: nose.main(defaultTest=sys.argv[0])