diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-01-12 20:49:54 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-01-12 20:49:54 +0000 |
commit | 9ec2830d0cc5e59b61d1625b233c4000df4a48f2 (patch) | |
tree | cedce441ded2343e77918fe2d09cd37f6ae8a608 | |
parent | 7656a8bab3340fe7a42ab49f41092fa1d9bbf970 (diff) | |
parent | c4d0367d8c2c070f5bedada1c0e8eeadbb9dca48 (diff) | |
download | keystone-9ec2830d0cc5e59b61d1625b233c4000df4a48f2.tar.gz |
Merge "Sync oslo-incubator rpc module"
-rw-r--r-- | keystone/openstack/common/rpc/amqp.py | 7 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/common.py | 6 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/dispatcher.py | 4 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/impl_kombu.py | 2 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/matchmaker.py | 4 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/proxy.py | 4 | ||||
-rw-r--r-- | keystone/openstack/common/rpc/securemessage.py | 521 |
7 files changed, 17 insertions, 531 deletions
diff --git a/keystone/openstack/common/rpc/amqp.py b/keystone/openstack/common/rpc/amqp.py index c8f23e230..a16e12a85 100644 --- a/keystone/openstack/common/rpc/amqp.py +++ b/keystone/openstack/common/rpc/amqp.py @@ -33,6 +33,8 @@ from eventlet import pools from eventlet import queue from eventlet import semaphore from oslo.config import cfg +import six + from keystone.openstack.common import excutils from keystone.openstack.common.gettextutils import _ # noqa @@ -300,10 +302,11 @@ def pack_context(msg, context): """ if isinstance(context, dict): context_d = dict([('_context_%s' % key, value) - for (key, value) in context.iteritems()]) + for (key, value) in six.iteritems(context)]) else: context_d = dict([('_context_%s' % key, value) - for (key, value) in context.to_dict().iteritems()]) + for (key, value) in + six.iteritems(context.to_dict())]) msg.update(context_d) diff --git a/keystone/openstack/common/rpc/common.py b/keystone/openstack/common/rpc/common.py index 42ab16ad4..9290b7bd9 100644 --- a/keystone/openstack/common/rpc/common.py +++ b/keystone/openstack/common/rpc/common.py @@ -34,6 +34,7 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) +_RPC_ENVELOPE_VERSION = '2.0' '''RPC Envelope Version. This version number applies to the top level structure of messages sent out. @@ -46,7 +47,7 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.0) is very simple. It is:: { 'oslo.version': <RPC Envelope Version as a String>, @@ -64,7 +65,6 @@ We will JSON encode the application message payload. The message envelope, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' @@ -86,7 +86,7 @@ class RPCException(Exception): # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) - for name, value in kwargs.iteritems(): + for name, value in six.iteritems(kwargs): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened message = self.msg_fmt diff --git a/keystone/openstack/common/rpc/dispatcher.py b/keystone/openstack/common/rpc/dispatcher.py index 51f0e19b6..2bcfe79af 100644 --- a/keystone/openstack/common/rpc/dispatcher.py +++ b/keystone/openstack/common/rpc/dispatcher.py @@ -81,6 +81,8 @@ On the client side, the same changes should be made as in example 1. The minimum version that supports the new parameter should be specified. """ +import six + from keystone.openstack.common.rpc import common as rpc_common from keystone.openstack.common.rpc import serializer as rpc_serializer @@ -119,7 +121,7 @@ class RpcDispatcher(object): :returns: A new set of deserialized args """ new_kwargs = dict() - for argname, arg in kwargs.iteritems(): + for argname, arg in six.iteritems(kwargs): new_kwargs[argname] = self.serializer.deserialize_entity(context, arg) return new_kwargs diff --git a/keystone/openstack/common/rpc/impl_kombu.py b/keystone/openstack/common/rpc/impl_kombu.py index 9cd2e2ae5..ec1cb2780 100644 --- a/keystone/openstack/common/rpc/impl_kombu.py +++ b/keystone/openstack/common/rpc/impl_kombu.py @@ -445,7 +445,7 @@ class Connection(object): 'virtual_host': self.conf.rabbit_virtual_host, } - for sp_key, value in server_params.iteritems(): + for sp_key, value in six.iteritems(server_params): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value diff --git a/keystone/openstack/common/rpc/matchmaker.py b/keystone/openstack/common/rpc/matchmaker.py index f7fa5e0cd..b5ea8deb2 100644 --- a/keystone/openstack/common/rpc/matchmaker.py +++ b/keystone/openstack/common/rpc/matchmaker.py @@ -90,7 +90,7 @@ class MatchMakerBase(object): """Acknowledge that a key.host is alive. Used internally for updating heartbeats, but may also be used - publically to acknowledge a system is alive (i.e. rpc message + publicly to acknowledge a system is alive (i.e. rpc message successfully sent to host) """ pass @@ -172,7 +172,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase): """Acknowledge that a host.topic is alive. Used internally for updating heartbeats, but may also be used - publically to acknowledge a system is alive (i.e. rpc message + publicly to acknowledge a system is alive (i.e. rpc message successfully sent to host) """ raise NotImplementedError("Must implement ack_alive") diff --git a/keystone/openstack/common/rpc/proxy.py b/keystone/openstack/common/rpc/proxy.py index f8409a035..ee1709284 100644 --- a/keystone/openstack/common/rpc/proxy.py +++ b/keystone/openstack/common/rpc/proxy.py @@ -19,6 +19,8 @@ For more information about rpc API version numbers, see: rpc/dispatcher.py """ +import six + from keystone.openstack.common import rpc from keystone.openstack.common.rpc import common as rpc_common from keystone.openstack.common.rpc import serializer as rpc_serializer @@ -97,7 +99,7 @@ class RpcProxy(object): :returns: A new set of serialized arguments """ new_kwargs = dict() - for argname, arg in kwargs.iteritems(): + for argname, arg in six.iteritems(kwargs): new_kwargs[argname] = self.serializer.serialize_entity(context, arg) return new_kwargs diff --git a/keystone/openstack/common/rpc/securemessage.py b/keystone/openstack/common/rpc/securemessage.py deleted file mode 100644 index ef5e191c2..000000000 --- a/keystone/openstack/common/rpc/securemessage.py +++ /dev/null @@ -1,521 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import collections -import os -import struct -import time - -import requests - -from oslo.config import cfg - -from keystone.openstack.common.crypto import utils as cryptoutils -from keystone.openstack.common import jsonutils -from keystone.openstack.common import log as logging - -secure_message_opts = [ - cfg.BoolOpt('enabled', default=True, - help='Whether Secure Messaging (Signing) is enabled,' - ' defaults to enabled'), - cfg.BoolOpt('enforced', default=False, - help='Whether Secure Messaging (Signing) is enforced,' - ' defaults to not enforced'), - cfg.BoolOpt('encrypt', default=False, - help='Whether Secure Messaging (Encryption) is enabled,' - ' defaults to not enabled'), - cfg.StrOpt('secret_keys_file', - help='Path to the file containing the keys, takes precedence' - ' over secret_key'), - cfg.MultiStrOpt('secret_key', - help='A list of keys: (ex: name:<base64 encoded key>),' - ' ignored if secret_keys_file is set'), - cfg.StrOpt('kds_endpoint', - help='KDS endpoint (ex: http://kds.example.com:35357/v3)'), -] -secure_message_group = cfg.OptGroup('secure_messages', - title='Secure Messaging options') - -LOG = logging.getLogger(__name__) - - -class SecureMessageException(Exception): - """Generic Exception for Secure Messages.""" - - msg = "An unknown Secure Message related exception occurred." - - def __init__(self, msg=None): - if msg is None: - msg = self.msg - super(SecureMessageException, self).__init__(msg) - - -class SharedKeyNotFound(SecureMessageException): - """No shared key was found and no other external authentication mechanism - is available. - """ - - msg = "Shared Key for [%s] Not Found. (%s)" - - def __init__(self, name, errmsg): - super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg)) - - -class InvalidMetadata(SecureMessageException): - """The metadata is invalid.""" - - msg = "Invalid metadata: %s" - - def __init__(self, err): - super(InvalidMetadata, self).__init__(self.msg % err) - - -class InvalidSignature(SecureMessageException): - """Signature validation failed.""" - - msg = "Failed to validate signature (source=%s, destination=%s)" - - def __init__(self, src, dst): - super(InvalidSignature, self).__init__(self.msg % (src, dst)) - - -class UnknownDestinationName(SecureMessageException): - """The Destination name is unknown to us.""" - - msg = "Invalid destination name (%s)" - - def __init__(self, name): - super(UnknownDestinationName, self).__init__(self.msg % name) - - -class InvalidEncryptedTicket(SecureMessageException): - """The Encrypted Ticket could not be successfully handled.""" - - msg = "Invalid Ticket (source=%s, destination=%s)" - - def __init__(self, src, dst): - super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst)) - - -class InvalidExpiredTicket(SecureMessageException): - """The ticket received is already expired.""" - - msg = "Expired ticket (source=%s, destination=%s)" - - def __init__(self, src, dst): - super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst)) - - -class CommunicationError(SecureMessageException): - """The Communication with the KDS failed.""" - - msg = "Communication Error (target=%s): %s" - - def __init__(self, target, errmsg): - super(CommunicationError, self).__init__(self.msg % (target, errmsg)) - - -class InvalidArgument(SecureMessageException): - """Bad initialization argument.""" - - msg = "Invalid argument: %s" - - def __init__(self, errmsg): - super(InvalidArgument, self).__init__(self.msg % errmsg) - - -Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek']) - - -class KeyStore(object): - """A storage class for Signing and Encryption Keys. - - This class creates an object that holds Generic Keys like Signing - Keys, Encryption Keys, Encrypted SEK Tickets ... - """ - - def __init__(self): - self._kvps = dict() - - def _get_key_name(self, source, target, ktype): - return (source, target, ktype) - - def _put(self, src, dst, ktype, expiration, data): - name = self._get_key_name(src, dst, ktype) - self._kvps[name] = (expiration, data) - - def _get(self, src, dst, ktype): - name = self._get_key_name(src, dst, ktype) - if name in self._kvps: - expiration, data = self._kvps[name] - if expiration > time.time(): - return data - else: - del self._kvps[name] - - return None - - def clear(self): - """Wipes the store clear of all data.""" - self._kvps.clear() - - def put_ticket(self, source, target, skey, ekey, esek, expiration): - """Puts a sek pair in the cache. - - :param source: Client name - :param target: Target name - :param skey: The Signing Key - :param ekey: The Encription Key - :param esek: The token encrypted with the target key - :param expiration: Expiration time in seconds since Epoch - """ - keys = Ticket(skey, ekey, esek) - self._put(source, target, 'ticket', expiration, keys) - - def get_ticket(self, source, target): - """Returns a Ticket (skey, ekey, esek) namedtuple for the - source/target pair. - """ - return self._get(source, target, 'ticket') - - -_KEY_STORE = KeyStore() - - -class _KDSClient(object): - - USER_AGENT = 'oslo-incubator/rpc' - - def __init__(self, endpoint=None, timeout=None): - """A KDS Client class.""" - - self._endpoint = endpoint - if timeout is not None: - self.timeout = float(timeout) - else: - self.timeout = None - - def _do_get(self, url, request): - req_kwargs = dict() - req_kwargs['headers'] = dict() - req_kwargs['headers']['User-Agent'] = self.USER_AGENT - req_kwargs['headers']['Content-Type'] = 'application/json' - req_kwargs['data'] = jsonutils.dumps({'request': request}) - if self.timeout is not None: - req_kwargs['timeout'] = self.timeout - - try: - resp = requests.get(url, **req_kwargs) - except requests.ConnectionError as e: - err = "Unable to establish connection. %s" % e - raise CommunicationError(url, err) - - return resp - - def _get_reply(self, url, resp): - if resp.text: - try: - body = jsonutils.loads(resp.text) - reply = body['reply'] - except (KeyError, TypeError, ValueError): - msg = "Failed to decode reply: %s" % resp.text - raise CommunicationError(url, msg) - else: - msg = "No reply data was returned." - raise CommunicationError(url, msg) - - return reply - - def _get_ticket(self, request, url=None, redirects=10): - """Send an HTTP request. - - Wraps around 'requests' to handle redirects and common errors. - """ - if url is None: - if not self._endpoint: - raise CommunicationError(url, 'Endpoint not configured') - url = self._endpoint + '/kds/ticket' - - while redirects: - resp = self._do_get(url, request) - if resp.status_code in (301, 302, 305): - # Redirected. Reissue the request to the new location. - url = resp.headers['location'] - redirects -= 1 - continue - elif resp.status_code != 200: - msg = "Request returned failure status: %s (%s)" - err = msg % (resp.status_code, resp.text) - raise CommunicationError(url, err) - - return self._get_reply(url, resp) - - raise CommunicationError(url, "Too many redirections, giving up!") - - def get_ticket(self, source, target, crypto, key): - - # prepare metadata - md = {'requestor': source, - 'target': target, - 'timestamp': time.time(), - 'nonce': struct.unpack('Q', os.urandom(8))[0]} - metadata = base64.b64encode(jsonutils.dumps(md)) - - # sign metadata - signature = crypto.sign(key, metadata) - - # HTTP request - reply = self._get_ticket({'metadata': metadata, - 'signature': signature}) - - # verify reply - signature = crypto.sign(key, (reply['metadata'] + reply['ticket'])) - if signature != reply['signature']: - raise InvalidEncryptedTicket(md['source'], md['destination']) - md = jsonutils.loads(base64.b64decode(reply['metadata'])) - if ((md['source'] != source or - md['destination'] != target or - md['expiration'] < time.time())): - raise InvalidEncryptedTicket(md['source'], md['destination']) - - # return ticket data - tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket'])) - - return tkt, md['expiration'] - - -# we need to keep a global nonce, as this value should never repeat non -# matter how many SecureMessage objects we create -_NONCE = None - - -def _get_nonce(): - """We keep a single counter per instance, as it is so huge we can't - possibly cycle through within 1/100 of a second anyway. - """ - - global _NONCE - # Lazy initialize, for now get a random value, multiply by 2^32 and - # use it as the nonce base. The counter itself will rotate after - # 2^32 increments. - if _NONCE is None: - _NONCE = [struct.unpack('I', os.urandom(4))[0], 0] - - # Increment counter and wrap at 2^32 - _NONCE[1] += 1 - if _NONCE[1] > 0xffffffff: - _NONCE[1] = 0 - - # Return base + counter - return long((_NONCE[0] * 0xffffffff)) + _NONCE[1] - - -class SecureMessage(object): - """A Secure Message object. - - This class creates a signing/encryption facility for RPC messages. - It encapsulates all the necessary crypto primitives to insulate - regular code from the intricacies of message authentication, validation - and optionally encryption. - - :param topic: The topic name of the queue - :param host: The server name, together with the topic it forms a unique - name that is used to source signing keys, and verify - incoming messages. - :param conf: a ConfigOpts object - :param key: (optional) explicitly pass in endpoint private key. - If not provided it will be sourced from the service config - :param key_store: (optional) Storage class for local caching - :param encrypt: (defaults to False) Whether to encrypt messages - :param enctype: (defaults to AES) Cipher to use - :param hashtype: (defaults to SHA256) Hash function to use for signatures - """ - - def __init__(self, topic, host, conf, key=None, key_store=None, - encrypt=None, enctype='AES', hashtype='SHA256'): - - conf.register_group(secure_message_group) - conf.register_opts(secure_message_opts, group='secure_messages') - - self._name = '%s.%s' % (topic, host) - self._key = key - self._conf = conf.secure_messages - self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt - self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype) - self._hkdf = cryptoutils.HKDF(hashtype) - self._kds = _KDSClient(self._conf.kds_endpoint) - - if self._key is None: - self._key = self._init_key(topic, self._name) - if self._key is None: - err = "Secret Key (or key file) is missing or malformed" - raise SharedKeyNotFound(self._name, err) - - self._key_store = key_store or _KEY_STORE - - def _init_key(self, topic, name): - keys = None - if self._conf.secret_keys_file: - with open(self._conf.secret_keys_file, 'r') as f: - keys = f.readlines() - elif self._conf.secret_key: - keys = self._conf.secret_key - - if keys is None: - return None - - for k in keys: - if k[0] == '#': - continue - if ':' not in k: - break - svc, key = k.split(':', 1) - if svc == topic or svc == name: - return base64.b64decode(key) - - return None - - def _split_key(self, key, size): - sig_key = key[:size] - enc_key = key[size:] - return sig_key, enc_key - - def _decode_esek(self, key, source, target, timestamp, esek): - """This function decrypts the esek buffer passed in and returns a - KeyStore to be used to check and decrypt the received message. - - :param key: The key to use to decrypt the ticket (esek) - :param source: The name of the source service - :param traget: The name of the target service - :param timestamp: The incoming message timestamp - :param esek: a base64 encoded encrypted block containing a JSON string - """ - rkey = None - - try: - s = self._crypto.decrypt(key, esek) - j = jsonutils.loads(s) - - rkey = base64.b64decode(j['key']) - expiration = j['timestamp'] + j['ttl'] - if j['timestamp'] > timestamp or timestamp > expiration: - raise InvalidExpiredTicket(source, target) - - except Exception: - raise InvalidEncryptedTicket(source, target) - - info = '%s,%s,%s' % (source, target, str(j['timestamp'])) - - sek = self._hkdf.expand(rkey, info, len(key) * 2) - - return self._split_key(sek, len(key)) - - def _get_ticket(self, target): - """This function will check if we already have a SEK for the specified - target in the cache, or will go and try to fetch a new SEK from the key - server. - - :param target: The name of the target service - """ - ticket = self._key_store.get_ticket(self._name, target) - - if ticket is not None: - return ticket - - tkt, expiration = self._kds.get_ticket(self._name, target, - self._crypto, self._key) - - self._key_store.put_ticket(self._name, target, - base64.b64decode(tkt['skey']), - base64.b64decode(tkt['ekey']), - tkt['esek'], expiration) - return self._key_store.get_ticket(self._name, target) - - def encode(self, version, target, json_msg): - """This is the main encoding function. - - It takes a target and a message and returns a tuple consisting of a - JSON serialized metadata object, a JSON serialized (and optionally - encrypted) message, and a signature. - - :param version: the current envelope version - :param target: The name of the target service (usually with hostname) - :param json_msg: a serialized json message object - """ - ticket = self._get_ticket(target) - - metadata = jsonutils.dumps({'source': self._name, - 'destination': target, - 'timestamp': time.time(), - 'nonce': _get_nonce(), - 'esek': ticket.esek, - 'encryption': self._encrypt}) - - message = json_msg - if self._encrypt: - message = self._crypto.encrypt(ticket.ekey, message) - - signature = self._crypto.sign(ticket.skey, - version + metadata + message) - - return (metadata, message, signature) - - def decode(self, version, metadata, message, signature): - """This is the main decoding function. - - It takes a version, metadata, message and signature strings and - returns a tuple with a (decrypted) message and metadata or raises - an exception in case of error. - - :param version: the current envelope version - :param metadata: a JSON serialized object with metadata for validation - :param message: a JSON serialized (base64 encoded encrypted) message - :param signature: a base64 encoded signature - """ - md = jsonutils.loads(metadata) - - check_args = ('source', 'destination', 'timestamp', - 'nonce', 'esek', 'encryption') - for arg in check_args: - if arg not in md: - raise InvalidMetadata('Missing metadata "%s"' % arg) - - if md['destination'] != self._name: - # TODO(simo) handle group keys by checking target - raise UnknownDestinationName(md['destination']) - - try: - skey, ekey = self._decode_esek(self._key, - md['source'], md['destination'], - md['timestamp'], md['esek']) - except InvalidExpiredTicket: - raise - except Exception: - raise InvalidMetadata('Failed to decode ESEK for %s/%s' % ( - md['source'], md['destination'])) - - sig = self._crypto.sign(skey, version + metadata + message) - - if sig != signature: - raise InvalidSignature(md['source'], md['destination']) - - if md['encryption'] is True: - msg = self._crypto.decrypt(ekey, message) - else: - msg = message - - return (md, msg) |