diff options
author | Victor Stinner <vstinner@redhat.com> | 2015-07-01 17:48:54 +0200 |
---|---|---|
committer | Victor Stinner <vstinner@redhat.com> | 2015-07-10 10:42:32 +0200 |
commit | b2c55421d6fa2cc53254f901b2faa76b09455130 (patch) | |
tree | 3c0022ad70480fec51c15b155fe4f98bfab7ae9f /nova/crypto.py | |
parent | 22c3e7437a2757c9b55a0245c63a1f349209c99a (diff) | |
download | nova-b2c55421d6fa2cc53254f901b2faa76b09455130.tar.gz |
Port crypto to Python 3
Fix bytes vs Unicode issues:
* ssh_encrypt_text() now encodes text to UTF-8 if text type is Unicode.
* Encode Unicode to ASCII before encoding it to base64
* On Python 3, decode binascii.hexlify() and base64.encodestring()
output from ASCII to get Unicode (str type in Python 3) is required.
* convert_from_sshrsa_to_pkcs8(): reuse binascii.hexlify() instead of
a loop using struck.unpack() and "%02X" format.
* Replace str.encode('hex') with binascii.hexlify(str)
* Replace string.strip(text) with text.strip()
* Replace StringIO.StringIO with six.StringIO
* Add test on the output type of ssh_encrypt_text() and decrypt_text()
* Call utils.execute() with binary=True to not decode stdout/stderr
(to get them as raw bytes).
* Replace reduce() with six.moves.reduce()
* convert_version_to_str(): replace a/b with a//b to get integer
division
* tox.ini: add the following tests to Python 3.4
- nova.tests.unit.compute.test_keypairs
- nova.tests.unit.test_crypto
Blueprint nova-python3
Change-Id: I83d927166c0864020b205ac7495473795da7830d
Diffstat (limited to 'nova/crypto.py')
-rw-r--r-- | nova/crypto.py | 48 |
1 files changed, 31 insertions, 17 deletions
diff --git a/nova/crypto.py b/nova/crypto.py index 937c05a062..e59a8b1e0d 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -26,7 +26,6 @@ import base64 import binascii import os import re -import string import struct from oslo_concurrency import processutils @@ -133,7 +132,7 @@ def generate_fingerprint(public_key): try: parts = public_key.split(' ') ssh_alg = parts[0] - pub_data = parts[1].decode('base64') + pub_data = base64.b64decode(parts[1]) if ssh_alg == 'ssh-rsa': pkey = paramiko.RSAKey(data=pub_data) elif ssh_alg == 'ssh-dss': @@ -144,8 +143,10 @@ def generate_fingerprint(public_key): raise exception.InvalidKeypair( reason=_('Unknown ssh key type %s') % ssh_alg) raw_fp = binascii.hexlify(pkey.get_fingerprint()) + if six.PY3: + raw_fp = raw_fp.decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) - except (IndexError, UnicodeDecodeError, binascii.Error, + except (TypeError, IndexError, UnicodeDecodeError, binascii.Error, paramiko.ssh_exception.SSHException): raise exception.InvalidKeypair( reason=_('failed to generate fingerprint')) @@ -153,10 +154,12 @@ def generate_fingerprint(public_key): def generate_x509_fingerprint(pem_key): try: + if isinstance(pem_key, six.text_type): + pem_key = pem_key.encode('utf-8') (out, _err) = utils.execute('openssl', 'x509', '-inform', 'PEM', '-fingerprint', '-noout', process_input=pem_key) - fingerprint = string.strip(out.rpartition('=')[2]) + fingerprint = out.rpartition('=')[2].strip() return fingerprint.lower() except processutils.ProcessExecutionError as ex: raise exception.InvalidKeypair( @@ -166,7 +169,7 @@ def generate_x509_fingerprint(pem_key): def generate_key_pair(bits=2048): key = paramiko.RSAKey.generate(bits) - keyout = six.BytesIO() + keyout = six.StringIO() key.write_private_key(keyout) private_key = keyout.getvalue() public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64()) @@ -194,7 +197,8 @@ def decrypt_text(project_id, text): 'rsautl', '-decrypt', '-inkey', '%s' % private_key, - process_input=text) + process_input=text, + binary=True) return dec except processutils.ProcessExecutionError as exc: raise exception.DecryptionFailure(reason=exc.stderr) @@ -241,20 +245,20 @@ def convert_from_sshrsa_to_pkcs8(pubkey): # +- INTEGER 65537 # Build the sequence for the bit string - n_val = int( - ''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[2]]), 16) - e_val = int( - ''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[1]]), 16) + n_val = int(binascii.hexlify(parts[2]), 16) + e_val = int(binascii.hexlify(parts[1]), 16) pkinfo = _to_sequence(univ.Integer(n_val), univ.Integer(e_val)) # Convert the sequence into a bit string - pklong = long(der_encoder.encode(pkinfo).encode('hex'), 16) + pklong = int(binascii.hexlify(der_encoder.encode(pkinfo)), 16) pkbitstring = univ.BitString("'00%s'B" % bin(pklong)[2:]) # Build the key data structure oid = _to_sequence(_RSA_OID, univ.Null()) pkcs1_seq = _to_sequence(oid, pkbitstring) - pkcs8 = base64.encodestring(der_encoder.encode(pkcs1_seq)) + pkcs8 = base64.b64encode(der_encoder.encode(pkcs1_seq)) + if six.PY3: + pkcs8 = pkcs8.decode('ascii') # Remove the embedded new line and format the key, each line # should be 64 characters long @@ -264,7 +268,11 @@ def convert_from_sshrsa_to_pkcs8(pubkey): def ssh_encrypt_text(ssh_public_key, text): """Encrypt text with an ssh public key. + + If text is a Unicode string, encode it to UTF-8. """ + if isinstance(text, six.text_type): + text = text.encode('utf-8') with utils.tempdir() as tmpdir: sslkey = os.path.abspath(os.path.join(tmpdir, 'ssl.key')) try: @@ -277,7 +285,8 @@ def ssh_encrypt_text(ssh_public_key, text): '-pubin', '-inkey', sslkey, '-keyform', 'PEM', - process_input=text) + process_input=text, + binary=True) return enc except processutils.ProcessExecutionError as exc: raise exception.EncryptionFailure(reason=exc.stderr) @@ -375,14 +384,19 @@ def generate_winrm_x509_cert(user_id, bits=2048): 'openssl', 'req', '-x509', '-nodes', '-days', '3650', '-config', conffile, '-newkey', 'rsa:%s' % bits, '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, - '-extensions', 'v3_req_client') + '-extensions', 'v3_req_client', + binary=True) (out, _err) = utils.execute('openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password', 'pass:', - process_input=certificate) + process_input=certificate, + binary=True) - private_key = out.encode('base64') + private_key = base64.b64encode(out) fingerprint = generate_x509_fingerprint(certificate) + if six.PY3: + private_key = private_key.decode('ascii') + certificate = certificate.decode('utf-8') return (private_key, certificate, fingerprint) @@ -459,7 +473,7 @@ def _sign_csr(csr_text, ca_folder): './openssl.cnf', '-infiles', inbound) out, _err = utils.execute('openssl', 'x509', '-in', outbound, '-serial', '-noout') - serial = string.strip(out.rpartition('=')[2]) + serial = out.rpartition('=')[2].strip() os.chdir(start) with open(outbound, 'r') as crtfile: |