summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-01-12 20:49:54 +0000
committerGerrit Code Review <review@openstack.org>2014-01-12 20:49:54 +0000
commit9ec2830d0cc5e59b61d1625b233c4000df4a48f2 (patch)
treecedce441ded2343e77918fe2d09cd37f6ae8a608
parent7656a8bab3340fe7a42ab49f41092fa1d9bbf970 (diff)
parentc4d0367d8c2c070f5bedada1c0e8eeadbb9dca48 (diff)
downloadkeystone-9ec2830d0cc5e59b61d1625b233c4000df4a48f2.tar.gz
Merge "Sync oslo-incubator rpc module"
-rw-r--r--keystone/openstack/common/rpc/amqp.py7
-rw-r--r--keystone/openstack/common/rpc/common.py6
-rw-r--r--keystone/openstack/common/rpc/dispatcher.py4
-rw-r--r--keystone/openstack/common/rpc/impl_kombu.py2
-rw-r--r--keystone/openstack/common/rpc/matchmaker.py4
-rw-r--r--keystone/openstack/common/rpc/proxy.py4
-rw-r--r--keystone/openstack/common/rpc/securemessage.py521
7 files changed, 17 insertions, 531 deletions
diff --git a/keystone/openstack/common/rpc/amqp.py b/keystone/openstack/common/rpc/amqp.py
index c8f23e230..a16e12a85 100644
--- a/keystone/openstack/common/rpc/amqp.py
+++ b/keystone/openstack/common/rpc/amqp.py
@@ -33,6 +33,8 @@ from eventlet import pools
from eventlet import queue
from eventlet import semaphore
from oslo.config import cfg
+import six
+
from keystone.openstack.common import excutils
from keystone.openstack.common.gettextutils import _ # noqa
@@ -300,10 +302,11 @@ def pack_context(msg, context):
"""
if isinstance(context, dict):
context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.iteritems()])
+ for (key, value) in six.iteritems(context)])
else:
context_d = dict([('_context_%s' % key, value)
- for (key, value) in context.to_dict().iteritems()])
+ for (key, value) in
+ six.iteritems(context.to_dict())])
msg.update(context_d)
diff --git a/keystone/openstack/common/rpc/common.py b/keystone/openstack/common/rpc/common.py
index 42ab16ad4..9290b7bd9 100644
--- a/keystone/openstack/common/rpc/common.py
+++ b/keystone/openstack/common/rpc/common.py
@@ -34,6 +34,7 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
+_RPC_ENVELOPE_VERSION = '2.0'
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
@@ -46,7 +47,7 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.0) is very simple. It is:
+The current message format (version 2.0) is very simple. It is::
{
'oslo.version': <RPC Envelope Version as a String>,
@@ -64,7 +65,6 @@ We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
@@ -86,7 +86,7 @@ class RPCException(Exception):
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
- for name, value in kwargs.iteritems():
+ for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
message = self.msg_fmt
diff --git a/keystone/openstack/common/rpc/dispatcher.py b/keystone/openstack/common/rpc/dispatcher.py
index 51f0e19b6..2bcfe79af 100644
--- a/keystone/openstack/common/rpc/dispatcher.py
+++ b/keystone/openstack/common/rpc/dispatcher.py
@@ -81,6 +81,8 @@ On the client side, the same changes should be made as in example 1. The
minimum version that supports the new parameter should be specified.
"""
+import six
+
from keystone.openstack.common.rpc import common as rpc_common
from keystone.openstack.common.rpc import serializer as rpc_serializer
@@ -119,7 +121,7 @@ class RpcDispatcher(object):
:returns: A new set of deserialized args
"""
new_kwargs = dict()
- for argname, arg in kwargs.iteritems():
+ for argname, arg in six.iteritems(kwargs):
new_kwargs[argname] = self.serializer.deserialize_entity(context,
arg)
return new_kwargs
diff --git a/keystone/openstack/common/rpc/impl_kombu.py b/keystone/openstack/common/rpc/impl_kombu.py
index 9cd2e2ae5..ec1cb2780 100644
--- a/keystone/openstack/common/rpc/impl_kombu.py
+++ b/keystone/openstack/common/rpc/impl_kombu.py
@@ -445,7 +445,7 @@ class Connection(object):
'virtual_host': self.conf.rabbit_virtual_host,
}
- for sp_key, value in server_params.iteritems():
+ for sp_key, value in six.iteritems(server_params):
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
diff --git a/keystone/openstack/common/rpc/matchmaker.py b/keystone/openstack/common/rpc/matchmaker.py
index f7fa5e0cd..b5ea8deb2 100644
--- a/keystone/openstack/common/rpc/matchmaker.py
+++ b/keystone/openstack/common/rpc/matchmaker.py
@@ -90,7 +90,7 @@ class MatchMakerBase(object):
"""Acknowledge that a key.host is alive.
Used internally for updating heartbeats, but may also be used
- publically to acknowledge a system is alive (i.e. rpc message
+ publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
pass
@@ -172,7 +172,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
"""Acknowledge that a host.topic is alive.
Used internally for updating heartbeats, but may also be used
- publically to acknowledge a system is alive (i.e. rpc message
+ publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
diff --git a/keystone/openstack/common/rpc/proxy.py b/keystone/openstack/common/rpc/proxy.py
index f8409a035..ee1709284 100644
--- a/keystone/openstack/common/rpc/proxy.py
+++ b/keystone/openstack/common/rpc/proxy.py
@@ -19,6 +19,8 @@ For more information about rpc API version numbers, see:
rpc/dispatcher.py
"""
+import six
+
from keystone.openstack.common import rpc
from keystone.openstack.common.rpc import common as rpc_common
from keystone.openstack.common.rpc import serializer as rpc_serializer
@@ -97,7 +99,7 @@ class RpcProxy(object):
:returns: A new set of serialized arguments
"""
new_kwargs = dict()
- for argname, arg in kwargs.iteritems():
+ for argname, arg in six.iteritems(kwargs):
new_kwargs[argname] = self.serializer.serialize_entity(context,
arg)
return new_kwargs
diff --git a/keystone/openstack/common/rpc/securemessage.py b/keystone/openstack/common/rpc/securemessage.py
deleted file mode 100644
index ef5e191c2..000000000
--- a/keystone/openstack/common/rpc/securemessage.py
+++ /dev/null
@@ -1,521 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2013 Red Hat, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import base64
-import collections
-import os
-import struct
-import time
-
-import requests
-
-from oslo.config import cfg
-
-from keystone.openstack.common.crypto import utils as cryptoutils
-from keystone.openstack.common import jsonutils
-from keystone.openstack.common import log as logging
-
-secure_message_opts = [
- cfg.BoolOpt('enabled', default=True,
- help='Whether Secure Messaging (Signing) is enabled,'
- ' defaults to enabled'),
- cfg.BoolOpt('enforced', default=False,
- help='Whether Secure Messaging (Signing) is enforced,'
- ' defaults to not enforced'),
- cfg.BoolOpt('encrypt', default=False,
- help='Whether Secure Messaging (Encryption) is enabled,'
- ' defaults to not enabled'),
- cfg.StrOpt('secret_keys_file',
- help='Path to the file containing the keys, takes precedence'
- ' over secret_key'),
- cfg.MultiStrOpt('secret_key',
- help='A list of keys: (ex: name:<base64 encoded key>),'
- ' ignored if secret_keys_file is set'),
- cfg.StrOpt('kds_endpoint',
- help='KDS endpoint (ex: http://kds.example.com:35357/v3)'),
-]
-secure_message_group = cfg.OptGroup('secure_messages',
- title='Secure Messaging options')
-
-LOG = logging.getLogger(__name__)
-
-
-class SecureMessageException(Exception):
- """Generic Exception for Secure Messages."""
-
- msg = "An unknown Secure Message related exception occurred."
-
- def __init__(self, msg=None):
- if msg is None:
- msg = self.msg
- super(SecureMessageException, self).__init__(msg)
-
-
-class SharedKeyNotFound(SecureMessageException):
- """No shared key was found and no other external authentication mechanism
- is available.
- """
-
- msg = "Shared Key for [%s] Not Found. (%s)"
-
- def __init__(self, name, errmsg):
- super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg))
-
-
-class InvalidMetadata(SecureMessageException):
- """The metadata is invalid."""
-
- msg = "Invalid metadata: %s"
-
- def __init__(self, err):
- super(InvalidMetadata, self).__init__(self.msg % err)
-
-
-class InvalidSignature(SecureMessageException):
- """Signature validation failed."""
-
- msg = "Failed to validate signature (source=%s, destination=%s)"
-
- def __init__(self, src, dst):
- super(InvalidSignature, self).__init__(self.msg % (src, dst))
-
-
-class UnknownDestinationName(SecureMessageException):
- """The Destination name is unknown to us."""
-
- msg = "Invalid destination name (%s)"
-
- def __init__(self, name):
- super(UnknownDestinationName, self).__init__(self.msg % name)
-
-
-class InvalidEncryptedTicket(SecureMessageException):
- """The Encrypted Ticket could not be successfully handled."""
-
- msg = "Invalid Ticket (source=%s, destination=%s)"
-
- def __init__(self, src, dst):
- super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst))
-
-
-class InvalidExpiredTicket(SecureMessageException):
- """The ticket received is already expired."""
-
- msg = "Expired ticket (source=%s, destination=%s)"
-
- def __init__(self, src, dst):
- super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst))
-
-
-class CommunicationError(SecureMessageException):
- """The Communication with the KDS failed."""
-
- msg = "Communication Error (target=%s): %s"
-
- def __init__(self, target, errmsg):
- super(CommunicationError, self).__init__(self.msg % (target, errmsg))
-
-
-class InvalidArgument(SecureMessageException):
- """Bad initialization argument."""
-
- msg = "Invalid argument: %s"
-
- def __init__(self, errmsg):
- super(InvalidArgument, self).__init__(self.msg % errmsg)
-
-
-Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek'])
-
-
-class KeyStore(object):
- """A storage class for Signing and Encryption Keys.
-
- This class creates an object that holds Generic Keys like Signing
- Keys, Encryption Keys, Encrypted SEK Tickets ...
- """
-
- def __init__(self):
- self._kvps = dict()
-
- def _get_key_name(self, source, target, ktype):
- return (source, target, ktype)
-
- def _put(self, src, dst, ktype, expiration, data):
- name = self._get_key_name(src, dst, ktype)
- self._kvps[name] = (expiration, data)
-
- def _get(self, src, dst, ktype):
- name = self._get_key_name(src, dst, ktype)
- if name in self._kvps:
- expiration, data = self._kvps[name]
- if expiration > time.time():
- return data
- else:
- del self._kvps[name]
-
- return None
-
- def clear(self):
- """Wipes the store clear of all data."""
- self._kvps.clear()
-
- def put_ticket(self, source, target, skey, ekey, esek, expiration):
- """Puts a sek pair in the cache.
-
- :param source: Client name
- :param target: Target name
- :param skey: The Signing Key
- :param ekey: The Encription Key
- :param esek: The token encrypted with the target key
- :param expiration: Expiration time in seconds since Epoch
- """
- keys = Ticket(skey, ekey, esek)
- self._put(source, target, 'ticket', expiration, keys)
-
- def get_ticket(self, source, target):
- """Returns a Ticket (skey, ekey, esek) namedtuple for the
- source/target pair.
- """
- return self._get(source, target, 'ticket')
-
-
-_KEY_STORE = KeyStore()
-
-
-class _KDSClient(object):
-
- USER_AGENT = 'oslo-incubator/rpc'
-
- def __init__(self, endpoint=None, timeout=None):
- """A KDS Client class."""
-
- self._endpoint = endpoint
- if timeout is not None:
- self.timeout = float(timeout)
- else:
- self.timeout = None
-
- def _do_get(self, url, request):
- req_kwargs = dict()
- req_kwargs['headers'] = dict()
- req_kwargs['headers']['User-Agent'] = self.USER_AGENT
- req_kwargs['headers']['Content-Type'] = 'application/json'
- req_kwargs['data'] = jsonutils.dumps({'request': request})
- if self.timeout is not None:
- req_kwargs['timeout'] = self.timeout
-
- try:
- resp = requests.get(url, **req_kwargs)
- except requests.ConnectionError as e:
- err = "Unable to establish connection. %s" % e
- raise CommunicationError(url, err)
-
- return resp
-
- def _get_reply(self, url, resp):
- if resp.text:
- try:
- body = jsonutils.loads(resp.text)
- reply = body['reply']
- except (KeyError, TypeError, ValueError):
- msg = "Failed to decode reply: %s" % resp.text
- raise CommunicationError(url, msg)
- else:
- msg = "No reply data was returned."
- raise CommunicationError(url, msg)
-
- return reply
-
- def _get_ticket(self, request, url=None, redirects=10):
- """Send an HTTP request.
-
- Wraps around 'requests' to handle redirects and common errors.
- """
- if url is None:
- if not self._endpoint:
- raise CommunicationError(url, 'Endpoint not configured')
- url = self._endpoint + '/kds/ticket'
-
- while redirects:
- resp = self._do_get(url, request)
- if resp.status_code in (301, 302, 305):
- # Redirected. Reissue the request to the new location.
- url = resp.headers['location']
- redirects -= 1
- continue
- elif resp.status_code != 200:
- msg = "Request returned failure status: %s (%s)"
- err = msg % (resp.status_code, resp.text)
- raise CommunicationError(url, err)
-
- return self._get_reply(url, resp)
-
- raise CommunicationError(url, "Too many redirections, giving up!")
-
- def get_ticket(self, source, target, crypto, key):
-
- # prepare metadata
- md = {'requestor': source,
- 'target': target,
- 'timestamp': time.time(),
- 'nonce': struct.unpack('Q', os.urandom(8))[0]}
- metadata = base64.b64encode(jsonutils.dumps(md))
-
- # sign metadata
- signature = crypto.sign(key, metadata)
-
- # HTTP request
- reply = self._get_ticket({'metadata': metadata,
- 'signature': signature})
-
- # verify reply
- signature = crypto.sign(key, (reply['metadata'] + reply['ticket']))
- if signature != reply['signature']:
- raise InvalidEncryptedTicket(md['source'], md['destination'])
- md = jsonutils.loads(base64.b64decode(reply['metadata']))
- if ((md['source'] != source or
- md['destination'] != target or
- md['expiration'] < time.time())):
- raise InvalidEncryptedTicket(md['source'], md['destination'])
-
- # return ticket data
- tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket']))
-
- return tkt, md['expiration']
-
-
-# we need to keep a global nonce, as this value should never repeat non
-# matter how many SecureMessage objects we create
-_NONCE = None
-
-
-def _get_nonce():
- """We keep a single counter per instance, as it is so huge we can't
- possibly cycle through within 1/100 of a second anyway.
- """
-
- global _NONCE
- # Lazy initialize, for now get a random value, multiply by 2^32 and
- # use it as the nonce base. The counter itself will rotate after
- # 2^32 increments.
- if _NONCE is None:
- _NONCE = [struct.unpack('I', os.urandom(4))[0], 0]
-
- # Increment counter and wrap at 2^32
- _NONCE[1] += 1
- if _NONCE[1] > 0xffffffff:
- _NONCE[1] = 0
-
- # Return base + counter
- return long((_NONCE[0] * 0xffffffff)) + _NONCE[1]
-
-
-class SecureMessage(object):
- """A Secure Message object.
-
- This class creates a signing/encryption facility for RPC messages.
- It encapsulates all the necessary crypto primitives to insulate
- regular code from the intricacies of message authentication, validation
- and optionally encryption.
-
- :param topic: The topic name of the queue
- :param host: The server name, together with the topic it forms a unique
- name that is used to source signing keys, and verify
- incoming messages.
- :param conf: a ConfigOpts object
- :param key: (optional) explicitly pass in endpoint private key.
- If not provided it will be sourced from the service config
- :param key_store: (optional) Storage class for local caching
- :param encrypt: (defaults to False) Whether to encrypt messages
- :param enctype: (defaults to AES) Cipher to use
- :param hashtype: (defaults to SHA256) Hash function to use for signatures
- """
-
- def __init__(self, topic, host, conf, key=None, key_store=None,
- encrypt=None, enctype='AES', hashtype='SHA256'):
-
- conf.register_group(secure_message_group)
- conf.register_opts(secure_message_opts, group='secure_messages')
-
- self._name = '%s.%s' % (topic, host)
- self._key = key
- self._conf = conf.secure_messages
- self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt
- self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype)
- self._hkdf = cryptoutils.HKDF(hashtype)
- self._kds = _KDSClient(self._conf.kds_endpoint)
-
- if self._key is None:
- self._key = self._init_key(topic, self._name)
- if self._key is None:
- err = "Secret Key (or key file) is missing or malformed"
- raise SharedKeyNotFound(self._name, err)
-
- self._key_store = key_store or _KEY_STORE
-
- def _init_key(self, topic, name):
- keys = None
- if self._conf.secret_keys_file:
- with open(self._conf.secret_keys_file, 'r') as f:
- keys = f.readlines()
- elif self._conf.secret_key:
- keys = self._conf.secret_key
-
- if keys is None:
- return None
-
- for k in keys:
- if k[0] == '#':
- continue
- if ':' not in k:
- break
- svc, key = k.split(':', 1)
- if svc == topic or svc == name:
- return base64.b64decode(key)
-
- return None
-
- def _split_key(self, key, size):
- sig_key = key[:size]
- enc_key = key[size:]
- return sig_key, enc_key
-
- def _decode_esek(self, key, source, target, timestamp, esek):
- """This function decrypts the esek buffer passed in and returns a
- KeyStore to be used to check and decrypt the received message.
-
- :param key: The key to use to decrypt the ticket (esek)
- :param source: The name of the source service
- :param traget: The name of the target service
- :param timestamp: The incoming message timestamp
- :param esek: a base64 encoded encrypted block containing a JSON string
- """
- rkey = None
-
- try:
- s = self._crypto.decrypt(key, esek)
- j = jsonutils.loads(s)
-
- rkey = base64.b64decode(j['key'])
- expiration = j['timestamp'] + j['ttl']
- if j['timestamp'] > timestamp or timestamp > expiration:
- raise InvalidExpiredTicket(source, target)
-
- except Exception:
- raise InvalidEncryptedTicket(source, target)
-
- info = '%s,%s,%s' % (source, target, str(j['timestamp']))
-
- sek = self._hkdf.expand(rkey, info, len(key) * 2)
-
- return self._split_key(sek, len(key))
-
- def _get_ticket(self, target):
- """This function will check if we already have a SEK for the specified
- target in the cache, or will go and try to fetch a new SEK from the key
- server.
-
- :param target: The name of the target service
- """
- ticket = self._key_store.get_ticket(self._name, target)
-
- if ticket is not None:
- return ticket
-
- tkt, expiration = self._kds.get_ticket(self._name, target,
- self._crypto, self._key)
-
- self._key_store.put_ticket(self._name, target,
- base64.b64decode(tkt['skey']),
- base64.b64decode(tkt['ekey']),
- tkt['esek'], expiration)
- return self._key_store.get_ticket(self._name, target)
-
- def encode(self, version, target, json_msg):
- """This is the main encoding function.
-
- It takes a target and a message and returns a tuple consisting of a
- JSON serialized metadata object, a JSON serialized (and optionally
- encrypted) message, and a signature.
-
- :param version: the current envelope version
- :param target: The name of the target service (usually with hostname)
- :param json_msg: a serialized json message object
- """
- ticket = self._get_ticket(target)
-
- metadata = jsonutils.dumps({'source': self._name,
- 'destination': target,
- 'timestamp': time.time(),
- 'nonce': _get_nonce(),
- 'esek': ticket.esek,
- 'encryption': self._encrypt})
-
- message = json_msg
- if self._encrypt:
- message = self._crypto.encrypt(ticket.ekey, message)
-
- signature = self._crypto.sign(ticket.skey,
- version + metadata + message)
-
- return (metadata, message, signature)
-
- def decode(self, version, metadata, message, signature):
- """This is the main decoding function.
-
- It takes a version, metadata, message and signature strings and
- returns a tuple with a (decrypted) message and metadata or raises
- an exception in case of error.
-
- :param version: the current envelope version
- :param metadata: a JSON serialized object with metadata for validation
- :param message: a JSON serialized (base64 encoded encrypted) message
- :param signature: a base64 encoded signature
- """
- md = jsonutils.loads(metadata)
-
- check_args = ('source', 'destination', 'timestamp',
- 'nonce', 'esek', 'encryption')
- for arg in check_args:
- if arg not in md:
- raise InvalidMetadata('Missing metadata "%s"' % arg)
-
- if md['destination'] != self._name:
- # TODO(simo) handle group keys by checking target
- raise UnknownDestinationName(md['destination'])
-
- try:
- skey, ekey = self._decode_esek(self._key,
- md['source'], md['destination'],
- md['timestamp'], md['esek'])
- except InvalidExpiredTicket:
- raise
- except Exception:
- raise InvalidMetadata('Failed to decode ESEK for %s/%s' % (
- md['source'], md['destination']))
-
- sig = self._crypto.sign(skey, version + metadata + message)
-
- if sig != signature:
- raise InvalidSignature(md['source'], md['destination'])
-
- if md['encryption'] is True:
- msg = self._crypto.decrypt(ekey, message)
- else:
- msg = message
-
- return (md, msg)