diff options
Diffstat (limited to 'tests/test_resolver.py')
-rw-r--r-- | tests/test_resolver.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/tests/test_resolver.py b/tests/test_resolver.py new file mode 100644 index 0000000..6be955b --- /dev/null +++ b/tests/test_resolver.py @@ -0,0 +1,184 @@ +# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. +# +# Permission to use, copy, modify, and distribute this software and its +# documentation for any purpose with or without fee is hereby granted, +# provided that the above copyright notice and this permission notice +# appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT +# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +import cStringIO +import select +import sys +import time +import unittest + +import dns.name +import dns.message +import dns.name +import dns.rdataclass +import dns.rdatatype +import dns.resolver + +resolv_conf = """ + /t/t +# comment 1 +; comment 2 +domain foo +nameserver 10.0.0.1 +nameserver 10.0.0.2 +""" + +message_text = """id 1234 +opcode QUERY +rcode NOERROR +flags QR AA RD +;QUESTION +example. IN A +;ANSWER +example. 1 IN A 10.0.0.1 +;AUTHORITY +;ADDITIONAL +""" + +class FakeAnswer(object): + def __init__(self, expiration): + self.expiration = expiration + +class BaseResolverTests(object): + + if sys.platform != 'win32': + def testRead(self): + f = cStringIO.StringIO(resolv_conf) + r = dns.resolver.Resolver(f) + self.failUnless(r.nameservers == ['10.0.0.1', '10.0.0.2'] and + r.domain == dns.name.from_text('foo')) + + def testCacheExpiration(self): + message = dns.message.from_text(message_text) + name = dns.name.from_text('example.') + answer = dns.resolver.Answer(name, dns.rdatatype.A, dns.rdataclass.IN, + message) + cache = dns.resolver.Cache() + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + time.sleep(2) + self.failUnless(cache.get((name, dns.rdatatype.A, dns.rdataclass.IN)) + is None) + + def testCacheCleaning(self): + message = dns.message.from_text(message_text) + name = dns.name.from_text('example.') + answer = dns.resolver.Answer(name, dns.rdatatype.A, dns.rdataclass.IN, + message) + cache = dns.resolver.Cache(cleaning_interval=1.0) + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + time.sleep(2) + self.failUnless(cache.get((name, dns.rdatatype.A, dns.rdataclass.IN)) + is None) + + def testZoneForName1(self): + name = dns.name.from_text('www.dnspython.org.') + ezname = dns.name.from_text('dnspython.org.') + zname = dns.resolver.zone_for_name(name) + self.failUnless(zname == ezname) + + def testZoneForName2(self): + name = dns.name.from_text('a.b.www.dnspython.org.') + ezname = dns.name.from_text('dnspython.org.') + zname = dns.resolver.zone_for_name(name) + self.failUnless(zname == ezname) + + def testZoneForName3(self): + name = dns.name.from_text('dnspython.org.') + ezname = dns.name.from_text('dnspython.org.') + zname = dns.resolver.zone_for_name(name) + self.failUnless(zname == ezname) + + def testZoneForName4(self): + def bad(): + name = dns.name.from_text('dnspython.org', None) + zname = dns.resolver.zone_for_name(name) + self.failUnlessRaises(dns.resolver.NotAbsolute, bad) + + def testLRUReplace(self): + cache = dns.resolver.LRUCache(4) + for i in xrange(0, 5): + name = dns.name.from_text('example%d.' % i) + answer = FakeAnswer(time.time() + 1) + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + for i in xrange(0, 5): + name = dns.name.from_text('example%d.' % i) + if i == 0: + self.failUnless(cache.get((name, dns.rdatatype.A, + dns.rdataclass.IN)) + is None) + else: + self.failUnless(not cache.get((name, dns.rdatatype.A, + dns.rdataclass.IN)) + is None) + + def testLRUDoesLRU(self): + cache = dns.resolver.LRUCache(4) + for i in xrange(0, 4): + name = dns.name.from_text('example%d.' % i) + answer = FakeAnswer(time.time() + 1) + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + name = dns.name.from_text('example0.') + cache.get((name, dns.rdatatype.A, dns.rdataclass.IN)) + # The LRU is now example1. + name = dns.name.from_text('example4.') + answer = FakeAnswer(time.time() + 1) + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + for i in xrange(0, 5): + name = dns.name.from_text('example%d.' % i) + if i == 1: + self.failUnless(cache.get((name, dns.rdatatype.A, + dns.rdataclass.IN)) + is None) + else: + self.failUnless(not cache.get((name, dns.rdatatype.A, + dns.rdataclass.IN)) + is None) + + def testLRUExpiration(self): + cache = dns.resolver.LRUCache(4) + for i in xrange(0, 4): + name = dns.name.from_text('example%d.' % i) + answer = FakeAnswer(time.time() + 1) + cache.put((name, dns.rdatatype.A, dns.rdataclass.IN), answer) + time.sleep(2) + for i in xrange(0, 4): + name = dns.name.from_text('example%d.' % i) + self.failUnless(cache.get((name, dns.rdatatype.A, + dns.rdataclass.IN)) + is None) + +class PollingMonkeyPatchMixin(object): + def setUp(self): + self.__native_polling_backend = dns.query._polling_backend + dns.query._set_polling_backend(self.polling_backend()) + + unittest.TestCase.setUp(self) + + def tearDown(self): + dns.query._set_polling_backend(self.__native_polling_backend) + + unittest.TestCase.tearDown(self) + +class SelectResolverTestCase(PollingMonkeyPatchMixin, BaseResolverTests, unittest.TestCase): + def polling_backend(self): + return dns.query._select_for + +if hasattr(select, 'poll'): + class PollResolverTestCase(PollingMonkeyPatchMixin, BaseResolverTests, unittest.TestCase): + def polling_backend(self): + return dns.query._poll_for + +if __name__ == '__main__': + unittest.main() |