summaryrefslogtreecommitdiff
path: root/tests/test_tsig.py
diff options
context:
space:
mode:
authorNick Hall <nick.hall@deshaw.com>2020-08-07 23:00:36 +0100
committerNick Hall <nick.hall@deshaw.com>2020-08-08 01:21:24 +0100
commita7ae91d15d62798567e9c6ef5758779ef2654bb6 (patch)
tree864c77c73e9009e5f10b7969d3b04d3c318a534b /tests/test_tsig.py
parent40bf9335e823a6760614b4a835ebd06af4279c66 (diff)
downloaddnspython-a7ae91d15d62798567e9c6ef5758779ef2654bb6.tar.gz
Support callable() TSIG keyrings for use-cases like GSSTSig.
Diffstat (limited to 'tests/test_tsig.py')
-rw-r--r--tests/test_tsig.py83
1 files changed, 69 insertions, 14 deletions
diff --git a/tests/test_tsig.py b/tests/test_tsig.py
index c1cd5cb..ec5a6cc 100644
--- a/tests/test_tsig.py
+++ b/tests/test_tsig.py
@@ -3,6 +3,7 @@
import unittest
from unittest.mock import Mock
import time
+import base64
import dns.rcode
import dns.tsig
@@ -46,12 +47,6 @@ class TSIGTestCase(unittest.TestCase):
self.assertEqual(m.tsig_error, dns.rcode.BADKEY)
def test_verify_mac_for_context(self):
- dummy_ctx = None
- dummy_expected = None
- key = dns.tsig.Key('foo.com', 'abcd', 'bogus')
- with self.assertRaises(NotImplementedError):
- dummy_ctx = dns.tsig.get_context(key)
-
key = dns.tsig.Key('foo.com', 'abcd', 'hmac-sha512')
ctx = dns.tsig.get_context(key)
bad_expected = b'xxxxxxxxxx'
@@ -97,10 +92,11 @@ class TSIGTestCase(unittest.TestCase):
gssapi_context_mock.verify_signature.side_effect = verify_signature
# create the key and add it to the keyring
- key = dns.tsig.Key('gsstsigtest', gssapi_context_mock, 'gss-tsig')
+ keyname = 'gsstsigtest'
+ key = dns.tsig.Key(keyname, gssapi_context_mock, 'gss-tsig')
ctx = dns.tsig.get_context(key)
self.assertEqual(ctx.name, 'gss-tsig')
- gsskeyname = dns.name.from_text('gsstsigtest')
+ gsskeyname = dns.name.from_text(keyname)
keyring[gsskeyname] = key
# make sure we can get the keyring (no exception == success)
@@ -114,18 +110,65 @@ class TSIGTestCase(unittest.TestCase):
gssapi_context_mock.verify_signature.assert_called()
self.assertEqual(gssapi_context_mock.verify_signature.call_count, 1)
- # create example message and go to/from wire to simulate sign/verify
- m = dns.message.make_query('example', 'a')
- m.use_tsig(keyring, gsskeyname)
- w = m.to_wire()
- # not raising is passing
- dns.message.from_wire(w, keyring)
+ # simulate case where TKEY message is used to establish the context;
+ # first, the query from the client
+ tkey_message = dns.message.make_query(keyname, 'tkey', 'any')
+
+ # test existent/non-existent keys in the keyring
+ adapted_keyring = dns.tsig.GSSTSigAdapter(keyring)
+
+ fetched_key = adapted_keyring(tkey_message, gsskeyname)
+ self.assertEqual(fetched_key, key)
+ key = adapted_keyring(None, gsskeyname)
+ self.assertEqual(fetched_key, key)
+ key = adapted_keyring(tkey_message, "dummy")
+ self.assertEqual(key, None)
+
+ # create a response, TKEY and turn it into bytes, simulating the server
+ # sending the response to the query
+ tkey_response = dns.message.make_response(tkey_message)
+ key = base64.b64decode('KEYKEYKEYKEYKEYKEYKEYKEYKEYKEYKEYKEY')
+ tkey = dns.rdtypes.ANY.TKEY.TKEY(dns.rdataclass.ANY,
+ dns.rdatatype.TKEY,
+ dns.name.from_text('gss-tsig.'),
+ 1594203795, 1594206664,
+ 3, 0, key)
+
+ # add the TKEY answer and sign it
+ tkey_response.set_rcode(dns.rcode.NOERROR)
+ tkey_response.answer = [
+ dns.rrset.from_rdata(dns.name.from_text(keyname), 0, tkey)]
+ tkey_response.use_tsig(keyring=dns.tsig.GSSTSigAdapter(keyring),
+ keyname=gsskeyname,
+ algorithm=dns.tsig.GSS_TSIG)
+
+ # "send" it to the client
+ tkey_wire = tkey_response.to_wire()
+
+ # grab the response from the "server" and simulate the client side
+ dns.message.from_wire(tkey_wire, dns.tsig.GSSTSigAdapter(keyring))
# assertions to make sure the "gssapi" functions were called
gssapi_context_mock.get_signature.assert_called()
self.assertEqual(gssapi_context_mock.get_signature.call_count, 1)
gssapi_context_mock.verify_signature.assert_called()
self.assertEqual(gssapi_context_mock.verify_signature.call_count, 2)
+ gssapi_context_mock.step.assert_called()
+ self.assertEqual(gssapi_context_mock.step.call_count, 1)
+
+ # create example message and go to/from wire to simulate sign/verify
+ # of regular messages
+ a_message = dns.message.make_query('example', 'a')
+ a_message.use_tsig(dns.tsig.GSSTSigAdapter(keyring), gsskeyname)
+ a_wire = a_message.to_wire()
+ # not raising is passing
+ dns.message.from_wire(a_wire, dns.tsig.GSSTSigAdapter(keyring))
+
+ # assertions to make sure the "gssapi" functions were called again
+ gssapi_context_mock.get_signature.assert_called()
+ self.assertEqual(gssapi_context_mock.get_signature.call_count, 2)
+ gssapi_context_mock.verify_signature.assert_called()
+ self.assertEqual(gssapi_context_mock.verify_signature.call_count, 3)
def test_sign_and_validate(self):
m = dns.message.make_query('example', 'a')
@@ -134,6 +177,18 @@ class TSIGTestCase(unittest.TestCase):
# not raising is passing
dns.message.from_wire(w, keyring)
+ def test_validate_with_bad_keyring(self):
+ m = dns.message.make_query('example', 'a')
+ m.use_tsig(keyring, keyname)
+ w = m.to_wire()
+
+ # keyring == None is an error
+ with self.assertRaises(dns.message.UnknownTSIGKey):
+ dns.message.from_wire(w, None)
+ # callable keyring that returns None is an error
+ with self.assertRaises(dns.message.UnknownTSIGKey):
+ dns.message.from_wire(w, lambda m, n: None)
+
def test_sign_and_validate_with_other_data(self):
m = dns.message.make_query('example', 'a')
m.use_tsig(keyring, keyname, other_data=b'other')