diff options
| author | Nick Hall <nick.hall@deshaw.com> | 2020-08-07 23:00:36 +0100 |
|---|---|---|
| committer | Nick Hall <nick.hall@deshaw.com> | 2020-08-08 01:21:24 +0100 |
| commit | a7ae91d15d62798567e9c6ef5758779ef2654bb6 (patch) | |
| tree | 864c77c73e9009e5f10b7969d3b04d3c318a534b /tests/test_tsig.py | |
| parent | 40bf9335e823a6760614b4a835ebd06af4279c66 (diff) | |
| download | dnspython-a7ae91d15d62798567e9c6ef5758779ef2654bb6.tar.gz | |
Support callable() TSIG keyrings for use-cases like GSSTSig.
Diffstat (limited to 'tests/test_tsig.py')
| -rw-r--r-- | tests/test_tsig.py | 83 |
1 files changed, 69 insertions, 14 deletions
diff --git a/tests/test_tsig.py b/tests/test_tsig.py index c1cd5cb..ec5a6cc 100644 --- a/tests/test_tsig.py +++ b/tests/test_tsig.py @@ -3,6 +3,7 @@ import unittest from unittest.mock import Mock import time +import base64 import dns.rcode import dns.tsig @@ -46,12 +47,6 @@ class TSIGTestCase(unittest.TestCase): self.assertEqual(m.tsig_error, dns.rcode.BADKEY) def test_verify_mac_for_context(self): - dummy_ctx = None - dummy_expected = None - key = dns.tsig.Key('foo.com', 'abcd', 'bogus') - with self.assertRaises(NotImplementedError): - dummy_ctx = dns.tsig.get_context(key) - key = dns.tsig.Key('foo.com', 'abcd', 'hmac-sha512') ctx = dns.tsig.get_context(key) bad_expected = b'xxxxxxxxxx' @@ -97,10 +92,11 @@ class TSIGTestCase(unittest.TestCase): gssapi_context_mock.verify_signature.side_effect = verify_signature # create the key and add it to the keyring - key = dns.tsig.Key('gsstsigtest', gssapi_context_mock, 'gss-tsig') + keyname = 'gsstsigtest' + key = dns.tsig.Key(keyname, gssapi_context_mock, 'gss-tsig') ctx = dns.tsig.get_context(key) self.assertEqual(ctx.name, 'gss-tsig') - gsskeyname = dns.name.from_text('gsstsigtest') + gsskeyname = dns.name.from_text(keyname) keyring[gsskeyname] = key # make sure we can get the keyring (no exception == success) @@ -114,18 +110,65 @@ class TSIGTestCase(unittest.TestCase): gssapi_context_mock.verify_signature.assert_called() self.assertEqual(gssapi_context_mock.verify_signature.call_count, 1) - # create example message and go to/from wire to simulate sign/verify - m = dns.message.make_query('example', 'a') - m.use_tsig(keyring, gsskeyname) - w = m.to_wire() - # not raising is passing - dns.message.from_wire(w, keyring) + # simulate case where TKEY message is used to establish the context; + # first, the query from the client + tkey_message = dns.message.make_query(keyname, 'tkey', 'any') + + # test existent/non-existent keys in the keyring + adapted_keyring = dns.tsig.GSSTSigAdapter(keyring) + + fetched_key = adapted_keyring(tkey_message, gsskeyname) + self.assertEqual(fetched_key, key) + key = adapted_keyring(None, gsskeyname) + self.assertEqual(fetched_key, key) + key = adapted_keyring(tkey_message, "dummy") + self.assertEqual(key, None) + + # create a response, TKEY and turn it into bytes, simulating the server + # sending the response to the query + tkey_response = dns.message.make_response(tkey_message) + key = base64.b64decode('KEYKEYKEYKEYKEYKEYKEYKEYKEYKEYKEYKEY') + tkey = dns.rdtypes.ANY.TKEY.TKEY(dns.rdataclass.ANY, + dns.rdatatype.TKEY, + dns.name.from_text('gss-tsig.'), + 1594203795, 1594206664, + 3, 0, key) + + # add the TKEY answer and sign it + tkey_response.set_rcode(dns.rcode.NOERROR) + tkey_response.answer = [ + dns.rrset.from_rdata(dns.name.from_text(keyname), 0, tkey)] + tkey_response.use_tsig(keyring=dns.tsig.GSSTSigAdapter(keyring), + keyname=gsskeyname, + algorithm=dns.tsig.GSS_TSIG) + + # "send" it to the client + tkey_wire = tkey_response.to_wire() + + # grab the response from the "server" and simulate the client side + dns.message.from_wire(tkey_wire, dns.tsig.GSSTSigAdapter(keyring)) # assertions to make sure the "gssapi" functions were called gssapi_context_mock.get_signature.assert_called() self.assertEqual(gssapi_context_mock.get_signature.call_count, 1) gssapi_context_mock.verify_signature.assert_called() self.assertEqual(gssapi_context_mock.verify_signature.call_count, 2) + gssapi_context_mock.step.assert_called() + self.assertEqual(gssapi_context_mock.step.call_count, 1) + + # create example message and go to/from wire to simulate sign/verify + # of regular messages + a_message = dns.message.make_query('example', 'a') + a_message.use_tsig(dns.tsig.GSSTSigAdapter(keyring), gsskeyname) + a_wire = a_message.to_wire() + # not raising is passing + dns.message.from_wire(a_wire, dns.tsig.GSSTSigAdapter(keyring)) + + # assertions to make sure the "gssapi" functions were called again + gssapi_context_mock.get_signature.assert_called() + self.assertEqual(gssapi_context_mock.get_signature.call_count, 2) + gssapi_context_mock.verify_signature.assert_called() + self.assertEqual(gssapi_context_mock.verify_signature.call_count, 3) def test_sign_and_validate(self): m = dns.message.make_query('example', 'a') @@ -134,6 +177,18 @@ class TSIGTestCase(unittest.TestCase): # not raising is passing dns.message.from_wire(w, keyring) + def test_validate_with_bad_keyring(self): + m = dns.message.make_query('example', 'a') + m.use_tsig(keyring, keyname) + w = m.to_wire() + + # keyring == None is an error + with self.assertRaises(dns.message.UnknownTSIGKey): + dns.message.from_wire(w, None) + # callable keyring that returns None is an error + with self.assertRaises(dns.message.UnknownTSIGKey): + dns.message.from_wire(w, lambda m, n: None) + def test_sign_and_validate_with_other_data(self): m = dns.message.make_query('example', 'a') m.use_tsig(keyring, keyname, other_data=b'other') |
