summaryrefslogtreecommitdiff
path: root/nova/crypto.py
diff options
context:
space:
mode:
authorVictor Stinner <vstinner@redhat.com>2015-07-01 17:48:54 +0200
committerVictor Stinner <vstinner@redhat.com>2015-07-10 10:42:32 +0200
commitb2c55421d6fa2cc53254f901b2faa76b09455130 (patch)
tree3c0022ad70480fec51c15b155fe4f98bfab7ae9f /nova/crypto.py
parent22c3e7437a2757c9b55a0245c63a1f349209c99a (diff)
downloadnova-b2c55421d6fa2cc53254f901b2faa76b09455130.tar.gz
Port crypto to Python 3
Fix bytes vs Unicode issues: * ssh_encrypt_text() now encodes text to UTF-8 if text type is Unicode. * Encode Unicode to ASCII before encoding it to base64 * On Python 3, decode binascii.hexlify() and base64.encodestring() output from ASCII to get Unicode (str type in Python 3) is required. * convert_from_sshrsa_to_pkcs8(): reuse binascii.hexlify() instead of a loop using struck.unpack() and "%02X" format. * Replace str.encode('hex') with binascii.hexlify(str) * Replace string.strip(text) with text.strip() * Replace StringIO.StringIO with six.StringIO * Add test on the output type of ssh_encrypt_text() and decrypt_text() * Call utils.execute() with binary=True to not decode stdout/stderr (to get them as raw bytes). * Replace reduce() with six.moves.reduce() * convert_version_to_str(): replace a/b with a//b to get integer division * tox.ini: add the following tests to Python 3.4 - nova.tests.unit.compute.test_keypairs - nova.tests.unit.test_crypto Blueprint nova-python3 Change-Id: I83d927166c0864020b205ac7495473795da7830d
Diffstat (limited to 'nova/crypto.py')
-rw-r--r--nova/crypto.py48
1 files changed, 31 insertions, 17 deletions
diff --git a/nova/crypto.py b/nova/crypto.py
index 937c05a062..e59a8b1e0d 100644
--- a/nova/crypto.py
+++ b/nova/crypto.py
@@ -26,7 +26,6 @@ import base64
import binascii
import os
import re
-import string
import struct
from oslo_concurrency import processutils
@@ -133,7 +132,7 @@ def generate_fingerprint(public_key):
try:
parts = public_key.split(' ')
ssh_alg = parts[0]
- pub_data = parts[1].decode('base64')
+ pub_data = base64.b64decode(parts[1])
if ssh_alg == 'ssh-rsa':
pkey = paramiko.RSAKey(data=pub_data)
elif ssh_alg == 'ssh-dss':
@@ -144,8 +143,10 @@ def generate_fingerprint(public_key):
raise exception.InvalidKeypair(
reason=_('Unknown ssh key type %s') % ssh_alg)
raw_fp = binascii.hexlify(pkey.get_fingerprint())
+ if six.PY3:
+ raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
- except (IndexError, UnicodeDecodeError, binascii.Error,
+ except (TypeError, IndexError, UnicodeDecodeError, binascii.Error,
paramiko.ssh_exception.SSHException):
raise exception.InvalidKeypair(
reason=_('failed to generate fingerprint'))
@@ -153,10 +154,12 @@ def generate_fingerprint(public_key):
def generate_x509_fingerprint(pem_key):
try:
+ if isinstance(pem_key, six.text_type):
+ pem_key = pem_key.encode('utf-8')
(out, _err) = utils.execute('openssl', 'x509', '-inform', 'PEM',
'-fingerprint', '-noout',
process_input=pem_key)
- fingerprint = string.strip(out.rpartition('=')[2])
+ fingerprint = out.rpartition('=')[2].strip()
return fingerprint.lower()
except processutils.ProcessExecutionError as ex:
raise exception.InvalidKeypair(
@@ -166,7 +169,7 @@ def generate_x509_fingerprint(pem_key):
def generate_key_pair(bits=2048):
key = paramiko.RSAKey.generate(bits)
- keyout = six.BytesIO()
+ keyout = six.StringIO()
key.write_private_key(keyout)
private_key = keyout.getvalue()
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
@@ -194,7 +197,8 @@ def decrypt_text(project_id, text):
'rsautl',
'-decrypt',
'-inkey', '%s' % private_key,
- process_input=text)
+ process_input=text,
+ binary=True)
return dec
except processutils.ProcessExecutionError as exc:
raise exception.DecryptionFailure(reason=exc.stderr)
@@ -241,20 +245,20 @@ def convert_from_sshrsa_to_pkcs8(pubkey):
# +- INTEGER 65537
# Build the sequence for the bit string
- n_val = int(
- ''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[2]]), 16)
- e_val = int(
- ''.join(['%02X' % struct.unpack('B', x)[0] for x in parts[1]]), 16)
+ n_val = int(binascii.hexlify(parts[2]), 16)
+ e_val = int(binascii.hexlify(parts[1]), 16)
pkinfo = _to_sequence(univ.Integer(n_val), univ.Integer(e_val))
# Convert the sequence into a bit string
- pklong = long(der_encoder.encode(pkinfo).encode('hex'), 16)
+ pklong = int(binascii.hexlify(der_encoder.encode(pkinfo)), 16)
pkbitstring = univ.BitString("'00%s'B" % bin(pklong)[2:])
# Build the key data structure
oid = _to_sequence(_RSA_OID, univ.Null())
pkcs1_seq = _to_sequence(oid, pkbitstring)
- pkcs8 = base64.encodestring(der_encoder.encode(pkcs1_seq))
+ pkcs8 = base64.b64encode(der_encoder.encode(pkcs1_seq))
+ if six.PY3:
+ pkcs8 = pkcs8.decode('ascii')
# Remove the embedded new line and format the key, each line
# should be 64 characters long
@@ -264,7 +268,11 @@ def convert_from_sshrsa_to_pkcs8(pubkey):
def ssh_encrypt_text(ssh_public_key, text):
"""Encrypt text with an ssh public key.
+
+ If text is a Unicode string, encode it to UTF-8.
"""
+ if isinstance(text, six.text_type):
+ text = text.encode('utf-8')
with utils.tempdir() as tmpdir:
sslkey = os.path.abspath(os.path.join(tmpdir, 'ssl.key'))
try:
@@ -277,7 +285,8 @@ def ssh_encrypt_text(ssh_public_key, text):
'-pubin',
'-inkey', sslkey,
'-keyform', 'PEM',
- process_input=text)
+ process_input=text,
+ binary=True)
return enc
except processutils.ProcessExecutionError as exc:
raise exception.EncryptionFailure(reason=exc.stderr)
@@ -375,14 +384,19 @@ def generate_winrm_x509_cert(user_id, bits=2048):
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
'-config', conffile, '-newkey', 'rsa:%s' % bits,
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
- '-extensions', 'v3_req_client')
+ '-extensions', 'v3_req_client',
+ binary=True)
(out, _err) = utils.execute('openssl', 'pkcs12', '-export',
'-inkey', keyfile, '-password', 'pass:',
- process_input=certificate)
+ process_input=certificate,
+ binary=True)
- private_key = out.encode('base64')
+ private_key = base64.b64encode(out)
fingerprint = generate_x509_fingerprint(certificate)
+ if six.PY3:
+ private_key = private_key.decode('ascii')
+ certificate = certificate.decode('utf-8')
return (private_key, certificate, fingerprint)
@@ -459,7 +473,7 @@ def _sign_csr(csr_text, ca_folder):
'./openssl.cnf', '-infiles', inbound)
out, _err = utils.execute('openssl', 'x509', '-in', outbound,
'-serial', '-noout')
- serial = string.strip(out.rpartition('=')[2])
+ serial = out.rpartition('=')[2].strip()
os.chdir(start)
with open(outbound, 'r') as crtfile: