summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJamie Lennox <jamielennox@redhat.com>2015-01-12 14:48:22 +1000
committerJamie Lennox <jamielennox@redhat.com>2015-03-20 11:48:15 +1100
commit6c7d3581861500a6bf0d9de83d11fed18eef9d39 (patch)
treebe84c6d4d3b97b5e27d6ce570e7608116b662a3e
parent2956841a2ed8e14cc57d802763e0c5cda1b330ee (diff)
downloadheat-6c7d3581861500a6bf0d9de83d11fed18eef9d39.tar.gz
Use auth plugins for domain client and admin client
Extend the use of auth_plugins to encompass the other two keystone client uses. Change-Id: I0ed6f8a14fcc31be14071823609fdb78e21ee195
-rw-r--r--heat/common/heat_keystoneclient.py112
-rw-r--r--heat/tests/test_heatclient.py169
2 files changed, 134 insertions, 147 deletions
diff --git a/heat/common/heat_keystoneclient.py b/heat/common/heat_keystoneclient.py
index 0346a9588..ebc66c6d6 100644
--- a/heat/common/heat_keystoneclient.py
+++ b/heat/common/heat_keystoneclient.py
@@ -16,6 +16,7 @@
import collections
import uuid
+from keystoneclient.auth.identity import v3 as kc_auth_v3
import keystoneclient.exceptions as kc_exception
from keystoneclient import session
from keystoneclient.v3 import client as kc_v3
@@ -70,7 +71,8 @@ class KeystoneClientV3(object):
# path, we will work with either a v2.0 or v3 path
self.context = context
self._client = None
- self._admin_client = None
+ self._admin_auth = None
+ self._domain_admin_auth = None
self._domain_admin_client = None
self.session = session.Session.construct(self._ssl_options())
@@ -122,39 +124,55 @@ class KeystoneClientV3(object):
return self._client
@property
- def admin_client(self):
- if not self._admin_client:
- # Create admin client connection to v3 API
- admin_creds = self._service_admin_creds()
- admin_creds.update(self._ssl_options())
- c = kc_v3.Client(**admin_creds)
- try:
- c.authenticate()
- self._admin_client = c
- except kc_exception.Unauthorized:
- LOG.error(_LE("Admin client authentication failed"))
- raise exception.AuthorizationFailure()
- return self._admin_client
+ def admin_auth(self):
+ if not self._admin_auth:
+ importutils.import_module('keystonemiddleware.auth_token')
+
+ self._admin_auth = kc_auth_v3.Password(
+ username=cfg.CONF.keystone_authtoken.admin_user,
+ password=cfg.CONF.keystone_authtoken.admin_password,
+ user_domain_id='default',
+ auth_url=self.v3_endpoint,
+ project_name=cfg.CONF.keystone_authtoken.admin_tenant_name,
+ project_domain_id='default')
+
+ return self._admin_auth
@property
- def domain_admin_client(self):
- if not self._domain_admin_client:
- # Create domain admin client connection to v3 API
- admin_creds = self._domain_admin_creds()
- admin_creds.update(self._ssl_options())
- c = kc_v3.Client(**admin_creds)
+ def domain_admin_auth(self):
+ if not self._domain_admin_auth:
# Note we must specify the domain when getting the token
# as only a domain scoped token can create projects in the domain
if self._stack_domain_is_id:
- auth_kwargs = {'domain_id': self.stack_domain}
+ auth_kwargs = {'domain_id': self.stack_domain,
+ 'user_domain_id': self.stack_domain}
else:
- auth_kwargs = {'domain_name': self.stack_domain}
+ auth_kwargs = {'domain_name': self.stack_domain,
+ 'user_domain_name': self.stack_domain}
+
+ auth = kc_auth_v3.Password(username=self.domain_admin_user,
+ password=self.domain_admin_password,
+ auth_url=self.v3_endpoint,
+ **auth_kwargs)
+
+ # NOTE(jamielennox): just do something to ensure a valid token
try:
- c.authenticate(**auth_kwargs)
- self._domain_admin_client = c
+ auth.get_token(self.session)
except kc_exception.Unauthorized:
LOG.error(_LE("Domain admin client authentication failed"))
raise exception.AuthorizationFailure()
+
+ self._domain_admin_auth = auth
+
+ return self._domain_admin_auth
+
+ @property
+ def domain_admin_client(self):
+ if not self._domain_admin_client:
+ self._domain_admin_client = kc_v3.Client(
+ session=self.session,
+ auth=self.domain_admin_auth)
+
return self._domain_admin_client
def _v3_client_init(self):
@@ -182,29 +200,6 @@ class KeystoneClientV3(object):
return client
- def _service_admin_creds(self):
- # Import auth_token to have keystone_authtoken settings setup.
- importutils.import_module('keystonemiddleware.auth_token')
- creds = {
- 'username': cfg.CONF.keystone_authtoken.admin_user,
- 'password': cfg.CONF.keystone_authtoken.admin_password,
- 'auth_url': self.v3_endpoint,
- 'endpoint': self.v3_endpoint,
- 'project_name': cfg.CONF.keystone_authtoken.admin_tenant_name}
- return creds
-
- def _domain_admin_creds(self):
- creds = {
- 'username': self.domain_admin_user,
- 'password': self.domain_admin_password,
- 'auth_url': self.v3_endpoint,
- 'endpoint': self.v3_endpoint}
- if self._stack_domain_is_id:
- creds['user_domain_id'] = self.stack_domain
- else:
- creds['user_domain_name'] = self.stack_domain
- return creds
-
def _ssl_options(self):
opts = {'cacert': self._get_client_option('ca_file'),
'insecure': self._get_client_option('insecure'),
@@ -240,7 +235,16 @@ class KeystoneClientV3(object):
# We need the service admin user ID (not name), as the trustor user
# can't lookup the ID in keystoneclient unless they're admin
# workaround this by getting the user_id from admin_client
- trustee_user_id = self.admin_client.auth_ref.user_id
+
+ # NOTE(jamielennox): These should use the plugin get_user_id and
+ # get_project_id that will be available in the v1.1 keystoneclient
+
+ try:
+ trustee = self.admin_auth.get_access(self.session)
+ except kc_exception.Unauthorized:
+ LOG.error(_LE("Domain admin client authentication failed"))
+ raise exception.AuthorizationFailure()
+
trustor = self.context.auth_plugin.get_access(self.session)
# inherit the roles of the trustor, unless set trusts_delegated_roles
@@ -250,7 +254,7 @@ class KeystoneClientV3(object):
roles = self.context.roles
try:
trust = self.client.trusts.create(trustor_user=trustor.user_id,
- trustee_user=trustee_user_id,
+ trustee_user=trustee.user_id,
project=trustor.project_id,
impersonation=True,
role_names=roles)
@@ -396,8 +400,14 @@ class KeystoneClientV3(object):
if self._stack_domain_is_id:
self._stack_domain_id = self.stack_domain
else:
- self._stack_domain_id = (
- self._domain_admin_client.auth_ref.domain_id)
+ try:
+ access = self.domain_admin_auth.get_access(self.session)
+ except kc_exception.Unauthorized:
+ LOG.error(_LE("Keystone client authentication failed"))
+ raise exception.AuthorizationFailure()
+
+ self._stack_domain_id = access.domain_id
+
return self._stack_domain_id
def _check_stack_domain_user(self, user_id, project_id, action):
diff --git a/heat/tests/test_heatclient.py b/heat/tests/test_heatclient.py
index 2fd242e61..79c4c74a2 100644
--- a/heat/tests/test_heatclient.py
+++ b/heat/tests/test_heatclient.py
@@ -68,44 +68,41 @@ class KeystoneClientTest(common.HeatTestCase):
def _clear_domain_override(self):
cfg.CONF.clear_override('stack_user_domain_id')
- def _stub_admin_client(self, auth_ok=True):
- kc_v3.Client(
- auth_url='http://server.test:5000/v3',
- cacert=None,
- cert=None,
- endpoint='http://server.test:5000/v3',
- insecure=False,
- key=None,
- password='verybadpass',
- project_name='service',
- username='heat').AndReturn(self.mock_admin_client)
- self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
+ def _stub_admin_auth(self, auth_ok=True):
+ mock_ks_auth = self.m.CreateMockAnything()
+ auth_ref = self.m.CreateMockAnything()
+
+ a = mock_ks_auth.get_access(mox.IsA(ks_session.Session))
if auth_ok:
- self.mock_admin_client.authenticate().AndReturn(auth_ok)
- self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
- self.mock_admin_client.auth_ref.user_id = '1234'
+ auth_ref.user_id = '1234'
+ a.AndReturn(auth_ref)
else:
- self.mock_admin_client.authenticate().AndRaise(
- kc_exception.Unauthorized)
-
- def _stub_domain_admin_client(self, auth_ok=True):
- kc_v3.Client(
- auth_url='http://server.test:5000/v3',
- cacert=None,
- cert=None,
- endpoint='http://server.test:5000/v3',
- insecure=False,
- key=None,
- password='adminsecret',
- user_domain_id='adomain123',
- username='adminuser123').AndReturn(self.mock_admin_client)
+ a.AndRaise(kc_exception.Unauthorized)
+
+ m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
+ password='verybadpass',
+ user_domain_id='default',
+ project_name='service',
+ project_domain_id='default',
+ username='heat')
+ m.AndReturn(mock_ks_auth)
+
+ def _stub_domain_admin_client(self, domain_id=None):
+ mock_ks_auth = self.m.CreateMockAnything()
+ mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
+
+ m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
+ password='adminsecret',
+ domain_id='adomain123',
+ user_domain_id='adomain123',
+ username='adminuser123')
+ m.AndReturn(mock_ks_auth)
+
+ n = kc_v3.Client(session=mox.IsA(ks_session.Session),
+ auth=mock_ks_auth)
+ n.AndReturn(self.mock_admin_client)
+
self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
- self.mock_admin_client.authenticate(
- domain_id='adomain123').AndReturn(auth_ok)
- if auth_ok:
- self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
- self.mock_admin_client.auth_ref.user_id = '1234'
- self.mock_admin_client.auth_ref.domain_id = 'adomain123'
def _stubs_v3(self, method='token', trust_scoped=True,
user_id='trustor_user_id', auth_ref=None, client=True,
@@ -276,7 +273,7 @@ class KeystoneClientTest(common.HeatTestCase):
ctx = utils.dummy_context()
ctx.trust_id = None
- self._stub_domain_admin_client()
+ self._stub_domain_admin_client(domain_id=None)
# mock keystone client functions
self.mock_admin_client.roles = self.m.CreateMockAnything()
@@ -512,8 +509,8 @@ class KeystoneClientTest(common.HeatTestCase):
class MockTrust(object):
id = 'atrust123'
- self._stub_admin_client()
- mock_client, mock_auth_ref = self._stubs_v3(times=2)
+ self._stub_admin_auth()
+ mock_ks_auth, mock_auth_ref = self._stubs_v3(times=2)
cfg.CONF.set_override('deferred_auth_method', 'trusts')
if delegate_roles:
@@ -524,6 +521,10 @@ class KeystoneClientTest(common.HeatTestCase):
mock_auth_ref.user_id = '5678'
mock_auth_ref.project_id = '42'
+ self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
+
+ mock_auth_ref.user_id = '5678'
+ mock_auth_ref.project_id = '42'
self.mock_ks_v3_client.trusts = self.m.CreateMockAnything()
self.mock_ks_v3_client.trusts.create(
@@ -546,9 +547,11 @@ class KeystoneClientTest(common.HeatTestCase):
"""Test create_trust_context when creating a trust."""
- self._stub_admin_client()
-
+ self._stub_admin_auth()
+ # get_access gets called 2 times and so mox needs to have it registered
+ # twice, once for the trust check, then once to get the user_id
mock_auth, mock_auth_ref = self._stubs_v3(times=2)
+
cfg.CONF.set_override('deferred_auth_method', 'trusts')
cfg.CONF.set_override('trusts_delegated_roles', ['heat_stack_owner'])
@@ -606,42 +609,6 @@ class KeystoneClientTest(common.HeatTestCase):
'"stack_domain_admin_password"')
self.assertIn(exp_msg, six.text_type(err))
- def test_init_admin_client(self):
-
- """Test the admin_client property."""
-
- self._stub_admin_client()
- self.m.ReplayAll()
-
- ctx = utils.dummy_context()
- ctx.username = None
- ctx.password = None
- ctx.trust_id = None
- heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
- self.assertEqual(self.mock_admin_client, heat_ks_client.admin_client)
- self.assertEqual(self.mock_admin_client, heat_ks_client._admin_client)
-
- def test_init_admin_client_denied(self):
-
- """Test the admin_client property, auth failure path."""
-
- self._stub_admin_client(auth_ok=False)
- self.m.ReplayAll()
-
- ctx = utils.dummy_context()
- ctx.username = None
- ctx.password = None
- ctx.trust_id = None
- heat_ks_client = heat_keystoneclient.KeystoneClient(ctx)
-
- # Define wrapper for property or the property raises the exception
- # outside of the assertRaises which fails the test
- def get_admin_client():
- heat_ks_client.admin_client
-
- self.assertRaises(exception.AuthorizationFailure,
- get_admin_client)
-
def test_trust_init(self):
"""Test consuming a trust when initializing."""
@@ -1046,7 +1013,7 @@ class KeystoneClientTest(common.HeatTestCase):
"""Test creating ec2 credentials for domain user."""
- self._stub_domain_admin_client()
+ self._stub_domain_admin_client(domain_id=None)
ctx = utils.dummy_context()
ctx.trust_id = None
@@ -1306,7 +1273,7 @@ class KeystoneClientTest(common.HeatTestCase):
"""Test the delete_stack_domain_project function."""
- self._stub_domain_admin_client()
+ self._stub_domain_admin_client(domain_id=None)
self.mock_admin_client.projects = self.m.CreateMockAnything()
self.mock_admin_client.projects.get(project='aprojectid').AndRaise(
kc_exception.NotFound)
@@ -1321,7 +1288,7 @@ class KeystoneClientTest(common.HeatTestCase):
"""Test the delete_stack_domain_project function."""
- self._stub_domain_admin_client()
+ self._stub_domain_admin_client(domain_id=None)
self.mock_admin_client.projects = self.m.CreateMockAnything()
self.mock_admin_client.projects.get(project='aprojectid').AndRaise(
kc_exception.Forbidden)
@@ -1493,24 +1460,34 @@ class KeystoneClientTestDomainName(KeystoneClientTest):
def _clear_domain_override(self):
cfg.CONF.clear_override('stack_user_domain_name')
- def _stub_domain_admin_client(self, auth_ok=True):
- kc_v3.Client(
- auth_url='http://server.test:5000/v3',
- cacert=None,
- cert=None,
- endpoint='http://server.test:5000/v3',
- insecure=False,
- key=None,
- password='adminsecret',
- user_domain_name='fake_domain_name',
- username='adminuser123').AndReturn(self.mock_admin_client)
+ def _stub_domain_admin_client_domain_get(self):
+ dummy_domain = self.m.CreateMockAnything()
+ dummy_domain.id = 'adomain123'
+ self.mock_ks_v3_client_domain_mngr.list(
+ name='fake_domain_name').AndReturn([dummy_domain])
+
+ def _stub_domain_admin_client(self, domain_id='adomain123'):
+ mock_ks_auth = self.m.CreateMockAnything()
+ mock_ks_auth.get_token(mox.IsA(ks_session.Session)).AndReturn('tok')
+
+ if domain_id:
+ a = self.m.CreateMockAnything()
+ a.domain_id = domain_id
+ mock_ks_auth.get_access(mox.IsA(ks_session.Session)).AndReturn(a)
+
+ m = ks_auth_v3.Password(auth_url='http://server.test:5000/v3',
+ password='adminsecret',
+ domain_name='fake_domain_name',
+ user_domain_name='fake_domain_name',
+ username='adminuser123')
+
+ m.AndReturn(mock_ks_auth)
+
+ n = kc_v3.Client(session=mox.IsA(ks_session.Session),
+ auth=mock_ks_auth)
+ n.AndReturn(self.mock_admin_client)
+
self.mock_admin_client.domains = self.mock_ks_v3_client_domain_mngr
- self.mock_admin_client.authenticate(
- domain_name='fake_domain_name').AndReturn(auth_ok)
- if auth_ok:
- self.mock_admin_client.auth_ref = self.m.CreateMockAnything()
- self.mock_admin_client.auth_ref.user_id = '1234'
- self.mock_admin_client.auth_ref.domain_id = 'adomain123'
def _stub_domain_user_pw_auth(self):
ks_auth_v3.Password(auth_url='http://server.test:5000/v3',