diff options
author | Andrew Bartlett <abartlet@samba.org> | 2017-06-01 13:26:37 +1200 |
---|---|---|
committer | Andrew Bartlett <abartlet@samba.org> | 2017-06-10 21:48:21 +0200 |
commit | 9229809f75fbc5750679ebb238876a9825552619 (patch) | |
tree | 1ad2b8e7327bbc60c4dc2f50b20d61363d73eb8f /python/samba/tests/dns.py | |
parent | 11ba6f8cde2928a20969acb20e779db3ad4a9cce (diff) | |
download | samba-9229809f75fbc5750679ebb238876a9825552619.tar.gz |
selftest: Create new common base class for dns.py and dns_tkey.py
This will allow more DNS tests to be written in the future with less
code duplication.
Diffstat (limited to 'python/samba/tests/dns.py')
-rw-r--r-- | python/samba/tests/dns.py | 179 |
1 files changed, 1 insertions, 178 deletions
diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index ac82d5051aa..1b5b64da3a4 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -22,11 +22,11 @@ import random import socket import samba.ndr as ndr from samba import credentials, param -from samba.tests import TestCase from samba.dcerpc import dns, dnsp, dnsserver from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record from samba.tests.subunitrun import SubunitOptions, TestProgram from samba import werror, WERRORError +from samba.tests.dns_base import DNSTest import samba.getopt as options import optparse @@ -60,183 +60,6 @@ server_name = args[0] server_ip = args[1] creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) -class DNSTest(TestCase): - - def setUp(self): - super(DNSTest, self).setUp() - self.timeout = None - - def errstr(self, errcode): - "Return a readable error code" - string_codes = [ - "OK", - "FORMERR", - "SERVFAIL", - "NXDOMAIN", - "NOTIMP", - "REFUSED", - "YXDOMAIN", - "YXRRSET", - "NXRRSET", - "NOTAUTH", - "NOTZONE", - "0x0B", - "0x0C", - "0x0D", - "0x0E", - "0x0F", - "BADSIG", - "BADKEY" - ] - - return string_codes[errcode] - - def assert_rcode_equals(self, rcode, expected): - "Helper function to check return code" - self.assertEquals(rcode, expected, "Expected RCODE %s, got %s" % - (self.errstr(expected), self.errstr(rcode))) - - def assert_dns_rcode_equals(self, packet, rcode): - "Helper function to check return code" - p_errcode = packet.operation & 0x000F - self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % - (self.errstr(rcode), self.errstr(p_errcode))) - - def assert_dns_opcode_equals(self, packet, opcode): - "Helper function to check opcode" - p_opcode = packet.operation & 0x7800 - self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % - (opcode, p_opcode)) - - def make_name_packet(self, opcode, qid=None): - "Helper creating a dns.name_packet" - p = dns.name_packet() - if qid is None: - p.id = random.randint(0x0, 0xff00) - p.operation = opcode - p.questions = [] - p.additional = [] - return p - - def finish_name_packet(self, packet, questions): - "Helper to finalize a dns.name_packet" - packet.qdcount = len(questions) - packet.questions = questions - - def make_name_question(self, name, qtype, qclass): - "Helper creating a dns.name_question" - q = dns.name_question() - q.name = name - q.question_type = qtype - q.question_class = qclass - return q - - def make_txt_record(self, records): - rdata_txt = dns.txt_record() - s_list = dnsp.string_list() - s_list.count = len(records) - s_list.str = records - rdata_txt.txt = s_list - return rdata_txt - - def get_dns_domain(self): - "Helper to get dns domain" - return self.creds.get_realm().lower() - - def dns_transaction_udp(self, packet, host, - dump=False, timeout=None): - "send a DNS query and read the reply" - s = None - if timeout is None: - timeout = self.timeout - try: - send_packet = ndr.ndr_pack(packet) - if dump: - print self.hexdump(send_packet) - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) - s.settimeout(timeout) - s.connect((host, 53)) - s.sendall(send_packet, 0) - recv_packet = s.recv(2048, 0) - if dump: - print self.hexdump(recv_packet) - response = ndr.ndr_unpack(dns.name_packet, recv_packet) - return (response, recv_packet) - finally: - if s is not None: - s.close() - - def dns_transaction_tcp(self, packet, host, - dump=False, timeout=None): - "send a DNS query and read the reply, also return the raw packet" - s = None - if timeout is None: - timeout = self.timeout - try: - send_packet = ndr.ndr_pack(packet) - if dump: - print self.hexdump(send_packet) - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - s.settimeout(timeout) - s.connect((host, 53)) - tcp_packet = struct.pack('!H', len(send_packet)) - tcp_packet += send_packet - s.sendall(tcp_packet) - - recv_packet = s.recv(0xffff + 2, 0) - if dump: - print self.hexdump(recv_packet) - response = ndr.ndr_unpack(dns.name_packet, recv_packet[2:]) - - finally: - if s is not None: - s.close() - - # unpacking and packing again should produce same bytestream - my_packet = ndr.ndr_pack(response) - self.assertEquals(my_packet, recv_packet[2:]) - - return (response, recv_packet[2:]) - - def make_txt_update(self, prefix, txt_array): - p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) - updates = [] - - name = self.get_dns_domain() - u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) - updates.append(u) - self.finish_name_packet(p, updates) - - updates = [] - r = dns.res_rec() - r.name = "%s.%s" % (prefix, self.get_dns_domain()) - r.rr_type = dns.DNS_QTYPE_TXT - r.rr_class = dns.DNS_QCLASS_IN - r.ttl = 900 - r.length = 0xffff - rdata = self.make_txt_record(txt_array) - r.rdata = rdata - updates.append(r) - p.nscount = len(updates) - p.nsrecs = updates - - return p - - def check_query_txt(self, prefix, txt_array): - name = "%s.%s" % (prefix, self.get_dns_domain()) - p = self.make_name_packet(dns.DNS_OPCODE_QUERY) - questions = [] - - q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN) - questions.append(q) - - self.finish_name_packet(p, questions) - (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip) - self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) - self.assertEquals(response.ancount, 1) - self.assertEquals(response.answers[0].rdata.txt.str, txt_array) - - class TestSimpleQueries(DNSTest): def setUp(self): super(TestSimpleQueries, self).setUp() |