diff options
author | Alex Gaynor <alex.gaynor@gmail.com> | 2020-07-23 20:40:46 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-23 19:40:46 -0500 |
commit | 037371861693f26297320dcd5fd8c221b6d8df26 (patch) | |
tree | ab18ca46617b0036e137cd6a154726acbab36bdf /src | |
parent | 4ca4fb9e8ed3c45f09efab8269e4078d40f39d9b (diff) | |
download | pyopenssl-git-037371861693f26297320dcd5fd8c221b6d8df26.tar.gz |
Paint it Black by the Rolling Stones (#920)
Diffstat (limited to 'src')
-rw-r--r-- | src/OpenSSL/SSL.py | 357 | ||||
-rw-r--r-- | src/OpenSSL/__init__.py | 24 | ||||
-rw-r--r-- | src/OpenSSL/_util.py | 20 | ||||
-rw-r--r-- | src/OpenSSL/crypto.py | 248 | ||||
-rw-r--r-- | src/OpenSSL/version.py | 10 |
5 files changed, 387 insertions, 272 deletions
diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index 25308f1..b4b308f 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py @@ -23,100 +23,108 @@ from OpenSSL._util import ( ) from OpenSSL.crypto import ( - FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store) + FILETYPE_PEM, + _PassphraseHelper, + PKey, + X509Name, + X509, + X509Store, +) __all__ = [ - 'OPENSSL_VERSION_NUMBER', - 'SSLEAY_VERSION', - 'SSLEAY_CFLAGS', - 'SSLEAY_PLATFORM', - 'SSLEAY_DIR', - 'SSLEAY_BUILT_ON', - 'SENT_SHUTDOWN', - 'RECEIVED_SHUTDOWN', - 'SSLv2_METHOD', - 'SSLv3_METHOD', - 'SSLv23_METHOD', - 'TLSv1_METHOD', - 'TLSv1_1_METHOD', - 'TLSv1_2_METHOD', - 'OP_NO_SSLv2', - 'OP_NO_SSLv3', - 'OP_NO_TLSv1', - 'OP_NO_TLSv1_1', - 'OP_NO_TLSv1_2', - 'OP_NO_TLSv1_3', - 'MODE_RELEASE_BUFFERS', - 'OP_SINGLE_DH_USE', - 'OP_SINGLE_ECDH_USE', - 'OP_EPHEMERAL_RSA', - 'OP_MICROSOFT_SESS_ID_BUG', - 'OP_NETSCAPE_CHALLENGE_BUG', - 'OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG', - 'OP_SSLREF2_REUSE_CERT_TYPE_BUG', - 'OP_MICROSOFT_BIG_SSLV3_BUFFER', - 'OP_MSIE_SSLV2_RSA_PADDING', - 'OP_SSLEAY_080_CLIENT_DH_BUG', - 'OP_TLS_D5_BUG', - 'OP_TLS_BLOCK_PADDING_BUG', - 'OP_DONT_INSERT_EMPTY_FRAGMENTS', - 'OP_CIPHER_SERVER_PREFERENCE', - 'OP_TLS_ROLLBACK_BUG', - 'OP_PKCS1_CHECK_1', - 'OP_PKCS1_CHECK_2', - 'OP_NETSCAPE_CA_DN_BUG', - 'OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG', - 'OP_NO_COMPRESSION', - 'OP_NO_QUERY_MTU', - 'OP_COOKIE_EXCHANGE', - 'OP_NO_TICKET', - 'OP_ALL', - 'VERIFY_PEER', - 'VERIFY_FAIL_IF_NO_PEER_CERT', - 'VERIFY_CLIENT_ONCE', - 'VERIFY_NONE', - 'SESS_CACHE_OFF', - 'SESS_CACHE_CLIENT', - 'SESS_CACHE_SERVER', - 'SESS_CACHE_BOTH', - 'SESS_CACHE_NO_AUTO_CLEAR', - 'SESS_CACHE_NO_INTERNAL_LOOKUP', - 'SESS_CACHE_NO_INTERNAL_STORE', - 'SESS_CACHE_NO_INTERNAL', - 'SSL_ST_CONNECT', - 'SSL_ST_ACCEPT', - 'SSL_ST_MASK', - 'SSL_CB_LOOP', - 'SSL_CB_EXIT', - 'SSL_CB_READ', - 'SSL_CB_WRITE', - 'SSL_CB_ALERT', - 'SSL_CB_READ_ALERT', - 'SSL_CB_WRITE_ALERT', - 'SSL_CB_ACCEPT_LOOP', - 'SSL_CB_ACCEPT_EXIT', - 'SSL_CB_CONNECT_LOOP', - 'SSL_CB_CONNECT_EXIT', - 'SSL_CB_HANDSHAKE_START', - 'SSL_CB_HANDSHAKE_DONE', - 'Error', - 'WantReadError', - 'WantWriteError', - 'WantX509LookupError', - 'ZeroReturnError', - 'SysCallError', - 'SSLeay_version', - 'Session', - 'Context', - 'Connection' + "OPENSSL_VERSION_NUMBER", + "SSLEAY_VERSION", + "SSLEAY_CFLAGS", + "SSLEAY_PLATFORM", + "SSLEAY_DIR", + "SSLEAY_BUILT_ON", + "SENT_SHUTDOWN", + "RECEIVED_SHUTDOWN", + "SSLv2_METHOD", + "SSLv3_METHOD", + "SSLv23_METHOD", + "TLSv1_METHOD", + "TLSv1_1_METHOD", + "TLSv1_2_METHOD", + "OP_NO_SSLv2", + "OP_NO_SSLv3", + "OP_NO_TLSv1", + "OP_NO_TLSv1_1", + "OP_NO_TLSv1_2", + "OP_NO_TLSv1_3", + "MODE_RELEASE_BUFFERS", + "OP_SINGLE_DH_USE", + "OP_SINGLE_ECDH_USE", + "OP_EPHEMERAL_RSA", + "OP_MICROSOFT_SESS_ID_BUG", + "OP_NETSCAPE_CHALLENGE_BUG", + "OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG", + "OP_SSLREF2_REUSE_CERT_TYPE_BUG", + "OP_MICROSOFT_BIG_SSLV3_BUFFER", + "OP_MSIE_SSLV2_RSA_PADDING", + "OP_SSLEAY_080_CLIENT_DH_BUG", + "OP_TLS_D5_BUG", + "OP_TLS_BLOCK_PADDING_BUG", + "OP_DONT_INSERT_EMPTY_FRAGMENTS", + "OP_CIPHER_SERVER_PREFERENCE", + "OP_TLS_ROLLBACK_BUG", + "OP_PKCS1_CHECK_1", + "OP_PKCS1_CHECK_2", + "OP_NETSCAPE_CA_DN_BUG", + "OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG", + "OP_NO_COMPRESSION", + "OP_NO_QUERY_MTU", + "OP_COOKIE_EXCHANGE", + "OP_NO_TICKET", + "OP_ALL", + "VERIFY_PEER", + "VERIFY_FAIL_IF_NO_PEER_CERT", + "VERIFY_CLIENT_ONCE", + "VERIFY_NONE", + "SESS_CACHE_OFF", + "SESS_CACHE_CLIENT", + "SESS_CACHE_SERVER", + "SESS_CACHE_BOTH", + "SESS_CACHE_NO_AUTO_CLEAR", + "SESS_CACHE_NO_INTERNAL_LOOKUP", + "SESS_CACHE_NO_INTERNAL_STORE", + "SESS_CACHE_NO_INTERNAL", + "SSL_ST_CONNECT", + "SSL_ST_ACCEPT", + "SSL_ST_MASK", + "SSL_CB_LOOP", + "SSL_CB_EXIT", + "SSL_CB_READ", + "SSL_CB_WRITE", + "SSL_CB_ALERT", + "SSL_CB_READ_ALERT", + "SSL_CB_WRITE_ALERT", + "SSL_CB_ACCEPT_LOOP", + "SSL_CB_ACCEPT_EXIT", + "SSL_CB_CONNECT_LOOP", + "SSL_CB_CONNECT_EXIT", + "SSL_CB_HANDSHAKE_START", + "SSL_CB_HANDSHAKE_DONE", + "Error", + "WantReadError", + "WantWriteError", + "WantX509LookupError", + "ZeroReturnError", + "SysCallError", + "SSLeay_version", + "Session", + "Context", + "Connection", ] try: _buffer = buffer except NameError: + class _buffer(object): pass + OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER SSLEAY_VERSION = _lib.SSLEAY_VERSION SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS @@ -199,12 +207,9 @@ if _lib.Cryptography_HAS_SSL_ST: SSL_ST_BEFORE = _lib.SSL_ST_BEFORE SSL_ST_OK = _lib.SSL_ST_OK SSL_ST_RENEGOTIATE = _lib.SSL_ST_RENEGOTIATE - __all__.extend([ - 'SSL_ST_INIT', - 'SSL_ST_BEFORE', - 'SSL_ST_OK', - 'SSL_ST_RENEGOTIATE', - ]) + __all__.extend( + ["SSL_ST_INIT", "SSL_ST_BEFORE", "SSL_ST_OK", "SSL_ST_RENEGOTIATE"] + ) SSL_CB_LOOP = _lib.SSL_CB_LOOP SSL_CB_EXIT = _lib.SSL_CB_EXIT @@ -333,7 +338,8 @@ class _VerifyHelper(_CallbackExceptionHelper): return 0 self.callback = _ffi.callback( - "int (*)(int, X509_STORE_CTX *)", wrapper) + "int (*)(int, X509_STORE_CTX *)", wrapper + ) class _NpnAdvertiseHelper(_CallbackExceptionHelper): @@ -352,7 +358,7 @@ class _NpnAdvertiseHelper(_CallbackExceptionHelper): # Join the protocols into a Python bytestring, length-prefixing # each element. - protostr = b''.join( + protostr = b"".join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) @@ -373,7 +379,7 @@ class _NpnAdvertiseHelper(_CallbackExceptionHelper): self.callback = _ffi.callback( "int (*)(SSL *, const unsigned char **, unsigned int *, void *)", - wrapper + wrapper, ) @@ -397,9 +403,9 @@ class _NpnSelectHelper(_CallbackExceptionHelper): protolist = [] while instr: length = indexbytes(instr, 0) - proto = instr[1:length + 1] + proto = instr[1 : length + 1] protolist.append(proto) - instr = instr[length + 1:] + instr = instr[length + 1 :] # Call the callback outstr = callback(conn, protolist) @@ -420,9 +426,11 @@ class _NpnSelectHelper(_CallbackExceptionHelper): return 2 # SSL_TLSEXT_ERR_ALERT_FATAL self.callback = _ffi.callback( - ("int (*)(SSL *, unsigned char **, unsigned char *, " - "const unsigned char *, unsigned int, void *)"), - wrapper + ( + "int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)" + ), + wrapper, ) @@ -449,15 +457,15 @@ class _ALPNSelectHelper(_CallbackExceptionHelper): protolist = [] while instr: encoded_len = indexbytes(instr, 0) - proto = instr[1:encoded_len + 1] + proto = instr[1 : encoded_len + 1] protolist.append(proto) - instr = instr[encoded_len + 1:] + instr = instr[encoded_len + 1 :] # Call the callback outbytes = callback(conn, protolist) any_accepted = True if outbytes is NO_OVERLAPPING_PROTOCOLS: - outbytes = b'' + outbytes = b"" any_accepted = False elif not isinstance(outbytes, bytes): raise TypeError( @@ -482,9 +490,11 @@ class _ALPNSelectHelper(_CallbackExceptionHelper): return _lib.SSL_TLSEXT_ERR_ALERT_FATAL self.callback = _ffi.callback( - ("int (*)(SSL *, unsigned char **, unsigned char *, " - "const unsigned char *, unsigned int, void *)"), - wrapper + ( + "int (*)(SSL *, unsigned char **, unsigned char *, " + "const unsigned char *, unsigned int, void *)" + ), + wrapper, ) @@ -596,7 +606,7 @@ class _OCSPClientCallbackHelper(_CallbackExceptionHelper): ocsp_len = _lib.SSL_get_tlsext_status_ocsp_resp(ssl, ocsp_ptr) if ocsp_len < 0: # No OCSP data. - ocsp_data = b'' + ocsp_data = b"" else: # Copy the OCSP data, then pass it to the callback. ocsp_data = _ffi.buffer(ocsp_ptr[0], ocsp_len)[:] @@ -628,7 +638,8 @@ def _asFileDescriptor(obj): raise TypeError("argument must be an int, or have a fileno() method.") elif fd < 0: raise ValueError( - "file descriptor cannot be a negative integer (%i)" % (fd,)) + "file descriptor cannot be a negative integer (%i)" % (fd,) + ) return fd @@ -643,8 +654,11 @@ def SSLeay_version(type): def _warn_npn(): - warnings.warn("NPN is deprecated. Protocols should switch to using ALPN.", - DeprecationWarning, stacklevel=3) + warnings.warn( + "NPN is deprecated. Protocols should switch to using ALPN.", + DeprecationWarning, + stacklevel=3, + ) def _make_requires(flag, error): @@ -657,11 +671,14 @@ def _make_requires(flag, error): ``Cryptography_HAS_NEXTPROTONEG``. :param error: The string to be used in the exception if the flag is false. """ + def _requires_decorator(func): if not flag: + @wraps(func) def explode(*args, **kwargs): raise NotImplementedError(error) + return explode else: return func @@ -687,6 +704,7 @@ class Session(object): .. versionadded:: 0.14 """ + pass @@ -698,6 +716,7 @@ class Context(object): :param method: One of SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, or TLSv1_METHOD. """ + _methods = { SSLv2_METHOD: "SSLv2_method", SSLv3_METHOD: "SSLv3_method", @@ -709,7 +728,8 @@ class Context(object): _methods = dict( (identifier, getattr(_lib, name)) for (identifier, name) in _methods.items() - if getattr(_lib, name, None) is not None) + if getattr(_lib, name, None) is not None + ) def __init__(self, method): if not isinstance(method, integer_types): @@ -790,8 +810,10 @@ class Context(object): @wraps(callback) def wrapper(size, verify, userdata): return callback(size, verify, self._passphrase_userdata) + return _PassphraseHelper( - FILETYPE_PEM, wrapper, more_args=True, truncate=True) + FILETYPE_PEM, wrapper, more_args=True, truncate=True + ) def set_passwd_cb(self, callback, userdata=None): """ @@ -818,7 +840,8 @@ class Context(object): self._passphrase_helper = self._wrap_callback(callback) self._passphrase_callback = self._passphrase_helper.callback _lib.SSL_CTX_set_default_passwd_cb( - self._context, self._passphrase_callback) + self._context, self._passphrase_callback + ) self._passphrase_userdata = userdata def set_default_verify_paths(self): @@ -848,9 +871,9 @@ class Context(object): # First we'll check to see if any env vars have been set. If so, # we won't try to do anything else because the user has set the path # themselves. - dir_env_var = _ffi.string( - _lib.X509_get_default_cert_dir_env() - ).decode("ascii") + dir_env_var = _ffi.string(_lib.X509_get_default_cert_dir_env()).decode( + "ascii" + ) file_env_var = _ffi.string( _lib.X509_get_default_cert_file_env() ).decode("ascii") @@ -861,13 +884,12 @@ class Context(object): # to the exact values we use in our manylinux1 builds. If they are # then we know to load the fallbacks if ( - default_dir == _CRYPTOGRAPHY_MANYLINUX1_CA_DIR and - default_file == _CRYPTOGRAPHY_MANYLINUX1_CA_FILE + default_dir == _CRYPTOGRAPHY_MANYLINUX1_CA_DIR + and default_file == _CRYPTOGRAPHY_MANYLINUX1_CA_FILE ): # This is manylinux1, let's load our fallback paths self._fallback_default_verify_paths( - _CERTIFICATE_FILE_LOCATIONS, - _CERTIFICATE_PATH_LOCATIONS + _CERTIFICATE_FILE_LOCATIONS, _CERTIFICATE_PATH_LOCATIONS ) def _check_env_vars_set(self, dir_env_var, file_env_var): @@ -877,8 +899,8 @@ class Context(object): :return: bool """ return ( - os.environ.get(file_env_var) is not None or - os.environ.get(dir_env_var) is not None + os.environ.get(file_env_var) is not None + or os.environ.get(dir_env_var) is not None ) def _fallback_default_verify_paths(self, file_path, dir_path): @@ -996,7 +1018,8 @@ class Context(object): raise TypeError("filetype must be an integer") use_result = _lib.SSL_CTX_use_PrivateKey_file( - self._context, keyfile, filetype) + self._context, keyfile, filetype + ) if not use_result: self._raise_passphrase_exception() @@ -1052,11 +1075,8 @@ class Context(object): """ buf = _text_to_bytes_and_warn("buf", buf) _openssl_assert( - _lib.SSL_CTX_set_session_id_context( - self._context, - buf, - len(buf), - ) == 1 + _lib.SSL_CTX_set_session_id_context(self._context, buf, len(buf),) + == 1 ) def set_session_cache_mode(self, mode): @@ -1202,19 +1222,17 @@ class Context(object): # invalid cipher string is passed, but without the following check # for the TLS 1.3 specific cipher suites it would never error. tmpconn = Connection(self, None) - if ( - tmpconn.get_cipher_list() == [ - 'TLS_AES_256_GCM_SHA384', - 'TLS_CHACHA20_POLY1305_SHA256', - 'TLS_AES_128_GCM_SHA256' - ] - ): + if tmpconn.get_cipher_list() == [ + "TLS_AES_256_GCM_SHA384", + "TLS_CHACHA20_POLY1305_SHA256", + "TLS_AES_128_GCM_SHA256", + ]: raise Error( [ ( - 'SSL routines', - 'SSL_CTX_set_cipher_list', - 'no cipher match', + "SSL routines", + "SSL_CTX_set_cipher_list", + "no cipher match", ), ], ) @@ -1240,9 +1258,7 @@ class Context(object): if not isinstance(ca_name, X509Name): raise TypeError( "client CAs must be X509Name objects, not %s " - "objects" % ( - type(ca_name).__name__, - ) + "objects" % (type(ca_name).__name__,) ) copy = _lib.X509_NAME_dup(ca_name._name) _openssl_assert(copy != _ffi.NULL) @@ -1273,7 +1289,8 @@ class Context(object): raise TypeError("certificate_authority must be an X509 instance") add_result = _lib.SSL_CTX_add_client_CA( - self._context, certificate_authority._x509) + self._context, certificate_authority._x509 + ) _openssl_assert(add_result == 1) def set_timeout(self, timeout): @@ -1311,11 +1328,14 @@ class Context(object): function call. :return: None """ + @wraps(callback) def wrapper(ssl, where, return_code): callback(Connection._reverse_mapping[ssl], where, return_code) + self._info_callback = _ffi.callback( - "void (*)(const SSL *, int, int)", wrapper) + "void (*)(const SSL *, int, int)", wrapper + ) _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) def get_app_data(self): @@ -1388,15 +1408,18 @@ class Context(object): .. versionadded:: 0.13 """ + @wraps(callback) def wrapper(ssl, alert, arg): callback(Connection._reverse_mapping[ssl]) return 0 self._tlsext_servername_callback = _ffi.callback( - "int (*)(SSL *, int *, void *)", wrapper) + "int (*)(SSL *, int *, void *)", wrapper + ) _lib.SSL_CTX_set_tlsext_servername_callback( - self._context, self._tlsext_servername_callback) + self._context, self._tlsext_servername_callback + ) def set_tlsext_use_srtp(self, profiles): """ @@ -1431,7 +1454,8 @@ class Context(object): self._npn_advertise_helper = _NpnAdvertiseHelper(callback) self._npn_advertise_callback = self._npn_advertise_helper.callback _lib.SSL_CTX_set_next_protos_advertised_cb( - self._context, self._npn_advertise_callback, _ffi.NULL) + self._context, self._npn_advertise_callback, _ffi.NULL + ) @_requires_npn def set_npn_select_callback(self, callback): @@ -1450,7 +1474,8 @@ class Context(object): self._npn_select_helper = _NpnSelectHelper(callback) self._npn_select_callback = self._npn_select_helper.callback _lib.SSL_CTX_set_next_proto_select_cb( - self._context, self._npn_select_callback, _ffi.NULL) + self._context, self._npn_select_callback, _ffi.NULL + ) @_requires_alpn def set_alpn_protos(self, protos): @@ -1465,7 +1490,7 @@ class Context(object): """ # Take the list of protocols and join them together, prefixing them # with their lengths. - protostr = b''.join( + protostr = b"".join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) @@ -1492,7 +1517,8 @@ class Context(object): self._alpn_select_helper = _ALPNSelectHelper(callback) self._alpn_select_callback = self._alpn_select_helper.callback _lib.SSL_CTX_set_alpn_select_cb( - self._context, self._alpn_select_callback, _ffi.NULL) + self._context, self._alpn_select_callback, _ffi.NULL + ) def _set_ocsp_callback(self, helper, data): """ @@ -1556,6 +1582,7 @@ class Context(object): class Connection(object): """ """ + _reverse_mapping = WeakValueDictionary() def __init__(self, context, socket=None): @@ -1609,7 +1636,8 @@ class Connection(object): self._from_ssl = None self._socket = socket set_result = _lib.SSL_set_fd( - self._ssl, _asFileDescriptor(self._socket)) + self._ssl, _asFileDescriptor(self._socket) + ) _openssl_assert(set_result == 1) def __getattr__(self, name): @@ -1618,9 +1646,10 @@ class Connection(object): on the Connection object. """ if self._socket is None: - raise AttributeError("'%s' object has no attribute '%s'" % ( - self.__class__.__name__, name - )) + raise AttributeError( + "'%s' object has no attribute '%s'" + % (self.__class__.__name__, name) + ) else: return getattr(self._socket, name) @@ -1777,9 +1806,7 @@ class Connection(object): # SSL_write's num arg is an int, # so we cannot send more than 2**31-1 bytes at once. result = _lib.SSL_write( - self._ssl, - data + total_sent, - min(left_to_send, 2147483647) + self._ssl, data + total_sent, min(left_to_send, 2147483647) ) self._raise_ssl_error(self._ssl, result) total_sent += result @@ -1803,6 +1830,7 @@ class Connection(object): result = _lib.SSL_read(self._ssl, buf, bufsiz) self._raise_ssl_error(self._ssl, result) return _ffi.buffer(buf, result)[:] + read = recv def recv_into(self, buffer, nbytes=None, flags=None): @@ -2069,7 +2097,8 @@ class Connection(object): :raise: NotImplementedError """ raise NotImplementedError( - "Cannot make file object of OpenSSL.SSL.Connection") + "Cannot make file object of OpenSSL.SSL.Connection" + ) def get_app_data(self): """ @@ -2182,10 +2211,16 @@ class Connection(object): context_buf = context context_len = len(context) use_context = 1 - success = _lib.SSL_export_keying_material(self._ssl, outp, olen, - label, len(label), - context_buf, context_len, - use_context) + success = _lib.SSL_export_keying_material( + self._ssl, + outp, + olen, + label, + len(label), + context_buf, + context_len, + use_context, + ) _openssl_assert(success == 1) return _ffi.buffer(outp, olen)[:] @@ -2470,7 +2505,7 @@ class Connection(object): """ # Take the list of protocols and join them together, prefixing them # with their lengths. - protostr = b''.join( + protostr = b"".join( chain.from_iterable((int2byte(len(p)), p) for p in protos) ) @@ -2493,7 +2528,7 @@ class Connection(object): _lib.SSL_get0_alpn_selected(self._ssl, data, data_len) if not data_len: - return b'' + return b"" return _ffi.buffer(data[0], data_len[0])[:] diff --git a/src/OpenSSL/__init__.py b/src/OpenSSL/__init__.py index 810d00d..11e896a 100644 --- a/src/OpenSSL/__init__.py +++ b/src/OpenSSL/__init__.py @@ -7,14 +7,26 @@ pyOpenSSL - A simple wrapper around the OpenSSL library from OpenSSL import crypto, SSL from OpenSSL.version import ( - __author__, __copyright__, __email__, __license__, __summary__, __title__, - __uri__, __version__, + __author__, + __copyright__, + __email__, + __license__, + __summary__, + __title__, + __uri__, + __version__, ) __all__ = [ - "SSL", "crypto", - - "__author__", "__copyright__", "__email__", "__license__", "__summary__", - "__title__", "__uri__", "__version__", + "SSL", + "crypto", + "__author__", + "__copyright__", + "__email__", + "__license__", + "__summary__", + "__title__", + "__uri__", + "__version__", ] diff --git a/src/OpenSSL/_util.py b/src/OpenSSL/_util.py index 9f2d724..1beefe6 100644 --- a/src/OpenSSL/_util.py +++ b/src/OpenSSL/_util.py @@ -46,10 +46,13 @@ def exception_from_error_queue(exception_type): error = lib.ERR_get_error() if error == 0: break - errors.append(( - text(lib.ERR_lib_error_string(error)), - text(lib.ERR_func_error_string(error)), - text(lib.ERR_reason_error_string(error)))) + errors.append( + ( + text(lib.ERR_lib_error_string(error)), + text(lib.ERR_func_error_string(error)), + text(lib.ERR_reason_error_string(error)), + ) + ) raise exception_type(errors) @@ -59,6 +62,7 @@ def make_assert(error): Create an assert function that uses :func:`exception_from_error_queue` to raise an exception wrapped by *error*. """ + def openssl_assert(ok): """ If *ok* is not True, retrieve the error from OpenSSL and raise it. @@ -108,9 +112,13 @@ def path_string(s): if PY2: + def byte_string(s): return s + + else: + def byte_string(s): return s.encode("charmap") @@ -141,9 +149,9 @@ def text_to_bytes_and_warn(label, obj): warnings.warn( _TEXT_WARNING.format(label), category=DeprecationWarning, - stacklevel=3 + stacklevel=3, ) - return obj.encode('utf-8') + return obj.encode("utf-8") return obj diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py index 30dd478..0744ca7 100644 --- a/src/OpenSSL/crypto.py +++ b/src/OpenSSL/crypto.py @@ -7,7 +7,8 @@ from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ from six import ( integer_types as _integer_types, text_type as _text_type, - PY2 as _PY2) + PY2 as _PY2, +) from cryptography import x509 from cryptography.hazmat.primitives.asymmetric import dsa, rsa @@ -24,42 +25,42 @@ from OpenSSL._util import ( ) __all__ = [ - 'FILETYPE_PEM', - 'FILETYPE_ASN1', - 'FILETYPE_TEXT', - 'TYPE_RSA', - 'TYPE_DSA', - 'Error', - 'PKey', - 'get_elliptic_curves', - 'get_elliptic_curve', - 'X509Name', - 'X509Extension', - 'X509Req', - 'X509', - 'X509StoreFlags', - 'X509Store', - 'X509StoreContextError', - 'X509StoreContext', - 'load_certificate', - 'dump_certificate', - 'dump_publickey', - 'dump_privatekey', - 'Revoked', - 'CRL', - 'PKCS7', - 'PKCS12', - 'NetscapeSPKI', - 'load_publickey', - 'load_privatekey', - 'dump_certificate_request', - 'load_certificate_request', - 'sign', - 'verify', - 'dump_crl', - 'load_crl', - 'load_pkcs7_data', - 'load_pkcs12' + "FILETYPE_PEM", + "FILETYPE_ASN1", + "FILETYPE_TEXT", + "TYPE_RSA", + "TYPE_DSA", + "Error", + "PKey", + "get_elliptic_curves", + "get_elliptic_curve", + "X509Name", + "X509Extension", + "X509Req", + "X509", + "X509StoreFlags", + "X509Store", + "X509StoreContextError", + "X509StoreContext", + "load_certificate", + "dump_certificate", + "dump_publickey", + "dump_privatekey", + "Revoked", + "CRL", + "PKCS7", + "PKCS12", + "NetscapeSPKI", + "load_publickey", + "load_privatekey", + "dump_certificate_request", + "load_certificate_request", + "sign", + "verify", + "dump_crl", + "load_crl", + "load_pkcs7_data", + "load_pkcs12", ] FILETYPE_PEM = _lib.SSL_FILETYPE_PEM @@ -93,6 +94,7 @@ def _get_backend(): triggering this side effect unless _get_backend is called. """ from cryptography.hazmat.backends.openssl.backend import backend + return backend @@ -135,7 +137,7 @@ def _bio_to_string(bio): """ Copy the contents of an OpenSSL BIO object into a Python byte string. """ - result_buffer = _ffi.new('char**') + result_buffer = _ffi.new("char**") buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) return _ffi.buffer(result_buffer[0], buffer_length)[:] @@ -172,7 +174,7 @@ def _get_asn1_time(timestamp): @return: The time value from C{timestamp} as a L{bytes} string in a certain format. Or C{None} if the object contains no time value. """ - string_timestamp = _ffi.cast('ASN1_STRING*', timestamp) + string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) if _lib.ASN1_STRING_length(string_timestamp) == 0: return None elif ( @@ -195,7 +197,8 @@ def _get_asn1_time(timestamp): _untested_error("ASN1_TIME_to_generalizedtime") else: string_timestamp = _ffi.cast( - "ASN1_STRING*", generalized_timestamp[0]) + "ASN1_STRING*", generalized_timestamp[0] + ) string_data = _lib.ASN1_STRING_data(string_timestamp) string_result = _ffi.string(string_data) _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) @@ -219,6 +222,7 @@ class PKey(object): """ A class representing an DSA or RSA public key or key pair. """ + _only_public = False _initialized = True @@ -257,8 +261,15 @@ class PKey(object): .. versionadded:: 16.1.0 """ pkey = cls() - if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey, - dsa.DSAPublicKey, dsa.DSAPrivateKey)): + if not isinstance( + crypto_key, + ( + rsa.RSAPublicKey, + rsa.RSAPrivateKey, + dsa.DSAPublicKey, + dsa.DSAPrivateKey, + ), + ): raise TypeError("Unsupported key type") pkey._pkey = crypto_key._evp_pkey @@ -375,6 +386,7 @@ class _EllipticCurve(object): instances each of which represents one curve supported by the system. @type _curves: :py:type:`NoneType` or :py:type:`set` """ + _curves = None if not _PY2: @@ -401,14 +413,12 @@ class _EllipticCurve(object): elliptic curves the underlying library supports. """ num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) - builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves) + builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) # The return value on this call should be num_curves again. We # could check it to make sure but if it *isn't* then.. what could # we do? Abort the whole process, I suppose...? -exarkun lib.EC_get_builtin_curves(builtin_curves, num_curves) - return set( - cls.from_nid(lib, c.nid) - for c in builtin_curves) + return set(cls.from_nid(lib, c.nid) for c in builtin_curves) @classmethod def _get_elliptic_curves(cls, lib): @@ -541,14 +551,16 @@ class X509Name(object): self._name = _ffi.gc(name, _lib.X509_NAME_free) def __setattr__(self, name, value): - if name.startswith('_'): + if name.startswith("_"): return super(X509Name, self).__setattr__(name, value) # Note: we really do not want str subclasses here, so we do not use # isinstance. if type(name) is not str: - raise TypeError("attribute name must be string, not '%.200s'" % ( - type(value).__name__,)) + raise TypeError( + "attribute name must be string, not '%.200s'" + % (type(value).__name__,) + ) nid = _lib.OBJ_txt2nid(_byte_string(name)) if nid == _lib.NID_undef: @@ -569,10 +581,11 @@ class X509Name(object): break if isinstance(value, _text_type): - value = value.encode('utf-8') + value = value.encode("utf-8") add_result = _lib.X509_NAME_add_entry_by_NID( - self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0) + self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 + ) if not add_result: _raise_current_error() @@ -608,9 +621,9 @@ class X509Name(object): _openssl_assert(data_length >= 0) try: - result = _ffi.buffer( - result_buffer[0], data_length - )[:].decode('utf-8') + result = _ffi.buffer(result_buffer[0], data_length)[:].decode( + "utf-8" + ) finally: # XXX untested _lib.OPENSSL_free(result_buffer[0]) @@ -622,6 +635,7 @@ class X509Name(object): return NotImplemented result = _lib.X509_NAME_cmp(self._name, other._name) return op(result, 0) + return f __eq__ = _cmp(__eq__) @@ -639,11 +653,13 @@ class X509Name(object): """ result_buffer = _ffi.new("char[]", 512) format_result = _lib.X509_NAME_oneline( - self._name, result_buffer, len(result_buffer)) + self._name, result_buffer, len(result_buffer) + ) _openssl_assert(format_result != _ffi.NULL) return "<X509Name object '%s'>" % ( - _native(_ffi.string(result_buffer)),) + _native(_ffi.string(result_buffer)), + ) def hash(self): """ @@ -664,7 +680,7 @@ class X509Name(object): :return: The DER encoded form of this name. :rtype: :py:class:`bytes` """ - result_buffer = _ffi.new('unsigned char**') + result_buffer = _ffi.new("unsigned char**") encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) _openssl_assert(encode_result >= 0) @@ -691,8 +707,9 @@ class X509Name(object): # ffi.string does not handle strings containing NULL bytes # (which may have been generated by old, broken software) - value = _ffi.buffer(_lib.ASN1_STRING_data(fval), - _lib.ASN1_STRING_length(fval))[:] + value = _ffi.buffer( + _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) + )[:] result.append((_ffi.string(name), value)) return result @@ -793,7 +810,8 @@ class X509Extension(object): parts.append(_native(_bio_to_string(bio))) else: value = _native( - _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) + _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] + ) parts.append(label + ":" + value) return ", ".join(parts) @@ -843,7 +861,7 @@ class X509Extension(object): .. versionadded:: 0.12 """ octet_result = _lib.X509_EXTENSION_get_data(self._extension) - string_result = _ffi.cast('ASN1_STRING*', octet_result) + string_result = _ffi.cast("ASN1_STRING*", octet_result) char_result = _lib.ASN1_STRING_data(string_result) result_length = _lib.ASN1_STRING_length(string_result) return _ffi.buffer(char_result, result_length)[:] @@ -869,8 +887,9 @@ class X509Req(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateSigningRequest + _CertificateSigningRequest, ) + backend = _get_backend() return _CertificateSigningRequest(backend, self._req) @@ -1052,6 +1071,7 @@ class X509(object): """ An X.509 certificate. """ + def __init__(self): x509 = _lib.X509_new() _openssl_assert(x509 != _ffi.NULL) @@ -1077,6 +1097,7 @@ class X509(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import _Certificate + backend = _get_backend() return _Certificate(backend, self._x509) @@ -1218,12 +1239,16 @@ class X509(object): result_length[0] = len(result_buffer) digest_result = _lib.X509_digest( - self._x509, digest, result_buffer, result_length) + self._x509, digest, result_buffer, result_length + ) _openssl_assert(digest_result == 1) - return b":".join([ - b16encode(ch).upper() for ch - in _ffi.buffer(result_buffer, result_length[0])]) + return b":".join( + [ + b16encode(ch).upper() + for ch in _ffi.buffer(result_buffer, result_length[0]) + ] + ) def subject_name_hash(self): """ @@ -1248,7 +1273,7 @@ class X509(object): hex_serial = hex(serial)[2:] if not isinstance(hex_serial, bytes): - hex_serial = hex_serial.encode('ascii') + hex_serial = hex_serial.encode("ascii") bignum_serial = _ffi.new("BIGNUM**") @@ -1259,7 +1284,8 @@ class X509(object): if bignum_serial[0] == _ffi.NULL: set_result = _lib.ASN1_INTEGER_set( - _lib.X509_get_serialNumber(self._x509), small_serial) + _lib.X509_get_serialNumber(self._x509), small_serial + ) if set_result: # TODO Not tested _raise_current_error() @@ -1524,6 +1550,7 @@ class X509StoreFlags(object): .. _OpenSSL Verification Flags: https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html """ + CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL @@ -1644,7 +1671,7 @@ class X509Store(object): param = _lib.X509_VERIFY_PARAM_new() param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) - _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s'))) + _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s"))) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) @@ -1722,8 +1749,13 @@ class X509StoreContext(object): errors = [ _lib.X509_STORE_CTX_get_error(self._store_ctx), _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), - _native(_ffi.string(_lib.X509_verify_cert_error_string( - _lib.X509_STORE_CTX_get_error(self._store_ctx)))), + _native( + _ffi.string( + _lib.X509_verify_cert_error_string( + _lib.X509_STORE_CTX_get_error(self._store_ctx) + ) + ) + ), ] # A context error should always be associated with a certificate, so we # expect this call to never return :class:`None`. @@ -1787,8 +1819,7 @@ def load_certificate(type, buffer): elif type == FILETYPE_ASN1: x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) else: - raise ValueError( - "type argument must be FILETYPE_PEM or FILETYPE_ASN1") + raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") if x509 == _ffi.NULL: _raise_current_error() @@ -1817,7 +1848,8 @@ def dump_certificate(type, cert): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(result_code == 1) return _bio_to_string(bio) @@ -1873,7 +1905,8 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if passphrase is None: raise TypeError( "if a value is given for cipher " - "one must also be given for passphrase") + "one must also be given for passphrase" + ) cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) if cipher_obj == _ffi.NULL: raise ValueError("Invalid cipher name") @@ -1883,8 +1916,14 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: result_code = _lib.PEM_write_bio_PrivateKey( - bio, pkey._pkey, cipher_obj, _ffi.NULL, 0, - helper.callback, helper.callback_args) + bio, + pkey._pkey, + cipher_obj, + _ffi.NULL, + 0, + helper.callback, + helper.callback_args, + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) @@ -1892,15 +1931,13 @@ def dump_privatekey(type, pkey, cipher=None, passphrase=None): if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") - rsa = _ffi.gc( - _lib.EVP_PKEY_get1_RSA(pkey._pkey), - _lib.RSA_free - ) + rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) result_code = _lib.RSA_print(bio, rsa, 0) else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(result_code != 0) @@ -1911,6 +1948,7 @@ class Revoked(object): """ A certificate revocation. """ + # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches # OCSP_crl_reason_str. We use the latter, just like the command line @@ -1950,7 +1988,8 @@ class Revoked(object): asn1_serial = _ffi.gc( _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), - _lib.ASN1_INTEGER_free) + _lib.ASN1_INTEGER_free, + ) _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) def get_serial(self): @@ -2001,7 +2040,7 @@ class Revoked(object): elif not isinstance(reason, bytes): raise TypeError("reason must be None or a byte string") else: - reason = reason.lower().replace(b' ', b'') + reason = reason.lower().replace(b" ", b"") reason_code = [r.lower() for r in self._crl_reasons].index(reason) new_reason_ext = _lib.ASN1_ENUMERATED_new() @@ -2013,7 +2052,8 @@ class Revoked(object): self._delete_reason() add_result = _lib.X509_REVOKED_add1_ext_i2d( - self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) + self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0 + ) _openssl_assert(add_result == 1) def get_reason(self): @@ -2095,8 +2135,9 @@ class CRL(object): .. versionadded:: 17.1.0 """ from cryptography.hazmat.backends.openssl.x509 import ( - _CertificateRevocationList + _CertificateRevocationList, ) + backend = _get_backend() return _CertificateRevocationList(backend, self._crl) @@ -2236,13 +2277,15 @@ class CRL(object): digest_obj = _lib.EVP_get_digestbyname(digest) _openssl_assert(digest_obj != _ffi.NULL) _lib.X509_CRL_set_issuer_name( - self._crl, _lib.X509_get_subject_name(issuer_cert._x509)) + self._crl, _lib.X509_get_subject_name(issuer_cert._x509) + ) _lib.X509_CRL_sort(self._crl) result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj) _openssl_assert(result != 0) - def export(self, cert, key, type=FILETYPE_PEM, days=100, - digest=_UNSPECIFIED): + def export( + self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED + ): """ Export the CRL as a string. @@ -2500,10 +2543,17 @@ class PKCS12(object): cert = self._cert._x509 pkcs12 = _lib.PKCS12_create( - passphrase, friendlyname, pkey, cert, cacerts, + passphrase, + friendlyname, + pkey, + cert, + cacerts, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, - iter, maciter, 0) + iter, + maciter, + 0, + ) if pkcs12 == _ffi.NULL: _raise_current_error() pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) @@ -2667,7 +2717,7 @@ class _PassphraseHelper(object): "passphrase returned by callback is too long" ) for i in range(len(result)): - buf[i] = result[i:i + 1] + buf[i] = result[i : i + 1] return len(result) except Exception as e: self._problems.append(e) @@ -2692,7 +2742,8 @@ def load_publickey(type, buffer): if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PUBKEY( - bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) + bio, _ffi.NULL, _ffi.NULL, _ffi.NULL + ) elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) else: @@ -2728,7 +2779,8 @@ def load_privatekey(type, buffer, passphrase=None): helper = _PassphraseHelper(type, passphrase) if type == FILETYPE_PEM: evp_pkey = _lib.PEM_read_bio_PrivateKey( - bio, _ffi.NULL, helper.callback, helper.callback_args) + bio, _ffi.NULL, helper.callback, helper.callback_args + ) helper.raise_if_problem() elif type == FILETYPE_ASN1: evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) @@ -2827,7 +2879,8 @@ def sign(pkey, data, digest): signature_buffer = _ffi.new("unsigned char[]", length) signature_length = _ffi.new("unsigned int *") final_result = _lib.EVP_SignFinal( - md_ctx, signature_buffer, signature_length, pkey._pkey) + md_ctx, signature_buffer, signature_length, pkey._pkey + ) _openssl_assert(final_result == 1) return _ffi.buffer(signature_buffer, signature_length[0])[:] @@ -2891,7 +2944,8 @@ def dump_crl(type, crl): else: raise ValueError( "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " - "FILETYPE_TEXT") + "FILETYPE_TEXT" + ) _openssl_assert(ret == 1) return _bio_to_string(bio) @@ -3061,4 +3115,4 @@ _lib.SSL_load_error_strings() # Set the default string mask to match OpenSSL upstream (since 2005) and # RFC5280 recommendations. -_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') +_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") diff --git a/src/OpenSSL/version.py b/src/OpenSSL/version.py index 339b9ae..76de33a 100644 --- a/src/OpenSSL/version.py +++ b/src/OpenSSL/version.py @@ -7,8 +7,14 @@ pyOpenSSL - A simple wrapper around the OpenSSL library """ __all__ = [ - "__author__", "__copyright__", "__email__", "__license__", "__summary__", - "__title__", "__uri__", "__version__", + "__author__", + "__copyright__", + "__email__", + "__license__", + "__summary__", + "__title__", + "__uri__", + "__version__", ] __version__ = "20.0.0.dev" |