diff options
author | Shane Harvey <shane.harvey@mongodb.com> | 2020-08-05 16:48:51 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-05 18:48:51 -0500 |
commit | 33c5499ce34f5e1c7c2630c6a1446353eee31755 (patch) | |
tree | 45f4871892c7b5e29e46fca2009e5cf77dc9eaa8 /tests | |
parent | bb971ae935059b73830ea2abe3f66391125b2bfb (diff) | |
download | pyopenssl-git-33c5499ce34f5e1c7c2630c6a1446353eee31755.tar.gz |
Allow accessing a connection's verfied certificate chain (#894)
* Allow accessing a connection's verfied certificate chain
Add X509StoreContext.get_verified_chain using X509_STORE_CTX_get1_chain.
Add Connection.get_verified_chain using SSL_get0_verified_chain if
available (ie OpenSSL 1.1+) and X509StoreContext.get_verified_chain
otherwise.
Fixes #740.
* TLSv1_METHOD -> SSLv23_METHOD
* Use X509_up_ref instead of X509_dup
* Add _openssl_assert where appropriate
* SSL_get_peer_cert_chain should not be null
* Reformat with black
* Fix <OpenSSL.crypto.X509 object at 0x7fdbb59e8050> != <OpenSSL.crypto.X509 object at 0x7fdbb59daad0>
* Add Changelog entry
* Remove _add_chain
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_crypto.py | 35 | ||||
-rw-r--r-- | tests/test_ssl.py | 57 |
2 files changed, 92 insertions, 0 deletions
diff --git a/tests/test_crypto.py b/tests/test_crypto.py index 3802d9a..ac4e729 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -3849,6 +3849,41 @@ class TestX509StoreContext(object): assert exc.value.args[0][2] == "certificate has expired" + def test_get_verified_chain(self): + """ + `get_verified_chain` returns the verified chain. + """ + store = X509Store() + store.add_cert(self.root_cert) + store.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + chain = store_ctx.get_verified_chain() + assert len(chain) == 3 + intermediate_subject = self.intermediate_server_cert.get_subject() + assert chain[0].get_subject() == intermediate_subject + assert chain[1].get_subject() == self.intermediate_cert.get_subject() + assert chain[2].get_subject() == self.root_cert.get_subject() + # Test reuse + chain = store_ctx.get_verified_chain() + assert len(chain) == 3 + assert chain[0].get_subject() == intermediate_subject + assert chain[1].get_subject() == self.intermediate_cert.get_subject() + assert chain[2].get_subject() == self.root_cert.get_subject() + + def test_get_verified_chain_invalid_chain_no_root(self): + """ + `get_verified_chain` raises error when cert verification fails. + """ + store = X509Store() + store.add_cert(self.intermediate_cert) + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + + with pytest.raises(X509StoreContextError) as exc: + store_ctx.get_verified_chain() + + assert exc.value.args[0][2] == "unable to get issuer certificate" + assert exc.value.certificate.get_subject().CN == "intermediate" + class TestSignVerify(object): """ diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 7e28ab7..9f134b4 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py @@ -2445,6 +2445,63 @@ class TestConnection(object): interact_in_memory(client, server) assert None is server.get_peer_cert_chain() + def test_get_verified_chain(self): + """ + `Connection.get_verified_chain` returns a list of certificates + which the connected server returned for the certification verification. + """ + chain = _create_certificate_chain() + [(cakey, cacert), (ikey, icert), (skey, scert)] = chain + + serverContext = Context(SSLv23_METHOD) + serverContext.use_privatekey(skey) + serverContext.use_certificate(scert) + serverContext.add_extra_chain_cert(icert) + serverContext.add_extra_chain_cert(cacert) + server = Connection(serverContext, None) + server.set_accept_state() + + # Create the client + clientContext = Context(SSLv23_METHOD) + # cacert is self-signed so the client must trust it for verification + # to succeed. + clientContext.get_cert_store().add_cert(cacert) + clientContext.set_verify(VERIFY_PEER, verify_cb) + client = Connection(clientContext, None) + client.set_connect_state() + + interact_in_memory(client, server) + + chain = client.get_verified_chain() + assert len(chain) == 3 + assert "Server Certificate" == chain[0].get_subject().CN + assert "Intermediate Certificate" == chain[1].get_subject().CN + assert "Authority Certificate" == chain[2].get_subject().CN + + def test_get_verified_chain_none(self): + """ + `Connection.get_verified_chain` returns `None` if the peer sends + no certificate chain. + """ + ctx = Context(SSLv23_METHOD) + ctx.use_privatekey(load_privatekey(FILETYPE_PEM, server_key_pem)) + ctx.use_certificate(load_certificate(FILETYPE_PEM, server_cert_pem)) + server = Connection(ctx, None) + server.set_accept_state() + client = Connection(Context(SSLv23_METHOD), None) + client.set_connect_state() + interact_in_memory(client, server) + assert None is server.get_verified_chain() + + def test_get_verified_chain_unconnected(self): + """ + `Connection.get_verified_chain` returns `None` when used with an object + which has not been connected. + """ + ctx = Context(SSLv23_METHOD) + server = Connection(ctx, None) + assert None is server.get_verified_chain() + def test_get_session_unconnected(self): """ `Connection.get_session` returns `None` when used with an object |