summaryrefslogtreecommitdiff
path: root/glance
diff options
context:
space:
mode:
authorZuul <zuul@review.opendev.org>2021-08-24 01:05:48 +0000
committerGerrit Code Review <review@openstack.org>2021-08-24 01:05:48 +0000
commit646e15da442a51fd13a9bb4864d04295287c0680 (patch)
tree4fe80ce928919a4d0e6ff3d769402d5c7b70b3b2 /glance
parentd23c593303de00c960833179d80264f6af474e31 (diff)
parentde560b19e6bcc6e3eb7bbc94b42710c4ca8819ce (diff)
downloadglance-646e15da442a51fd13a9bb4864d04295287c0680.tar.gz
Merge "Check add_image policy in the API"
Diffstat (limited to 'glance')
-rw-r--r--glance/api/v2/images.py12
-rw-r--r--glance/api/v2/policy.py84
-rw-r--r--glance/policies/base.py3
-rw-r--r--glance/policies/image.py2
-rw-r--r--glance/tests/functional/v2/test_images_api_policy.py30
-rw-r--r--glance/tests/unit/v2/test_v2_policy.py57
6 files changed, 182 insertions, 6 deletions
diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py
index bd6545109..8562adad9 100644
--- a/glance/api/v2/images.py
+++ b/glance/api/v2/images.py
@@ -92,9 +92,17 @@ class ImagesController(object):
@utils.mutating
def create(self, req, image, extra_properties, tags):
- image_factory = self.gateway.get_image_factory(req.context)
- image_repo = self.gateway.get_repo(req.context)
+ image_factory = self.gateway.get_image_factory(
+ req.context, authorization_layer=False)
+ image_repo = self.gateway.get_repo(req.context,
+ authorization_layer=False)
try:
+ if 'owner' not in image:
+ image['owner'] = req.context.project_id
+
+ api_policy.ImageAPIPolicy(req.context, image,
+ self.policy).add_image()
+
ks_quota.enforce_image_count_total(req.context, req.context.owner)
image = image_factory.new_image(extra_properties=extra_properties,
tags=tags, **image)
diff --git a/glance/api/v2/policy.py b/glance/api/v2/policy.py
index 61b73e685..6babef154 100644
--- a/glance/api/v2/policy.py
+++ b/glance/api/v2/policy.py
@@ -53,6 +53,28 @@ def check_is_image_mutable(context, image):
raise exception.Forbidden(_('You do not own this image'))
+def check_admin_or_same_owner(context, properties):
+ """Check that legacy behavior on create with owner is preserved.
+
+ Legacy behavior requires a static check that owner is not
+ inconsistent with the context, unless the caller is an
+ admin. Enforce that here, if needed.
+
+ :param context: A RequestContext
+ :param properties: The properties being used to create the image, which may
+ contain an owner
+ :raises: exception.Forbidden if the context is not an admin and owner is
+ set to something other than the context's project
+ """
+ if context.is_admin:
+ return
+
+ if context.project_id != properties.get('owner', context.project_id):
+ msg = _("You are not permitted to create images "
+ "owned by '%s'.")
+ raise exception.Forbidden(msg % properties['owner'])
+
+
class APIPolicyBase(object):
def __init__(self, context, target=None, enforcer=None):
self._context = context
@@ -84,16 +106,53 @@ class APIPolicyBase(object):
class ImageAPIPolicy(APIPolicyBase):
def __init__(self, context, image, enforcer=None):
- super(ImageAPIPolicy, self).__init__(context,
- policy.ImageTarget(image),
- enforcer)
+ """Image API policy module.
+
+ :param context: The RequestContext
+ :param image: The ImageProxy object in question, or a dict of image
+ properties if no image is yet created or needed for
+ authorization context.
+ :param enforcer: The policy.Enforcer object to use for enforcement
+ operations. If not provided (or None), the default
+ enforcer will be selected.
+ """
self._image = image
+ if not self.is_created:
+ # NOTE(danms): If we are being called with a dict of image
+ # properties then we are testing policies that involve
+ # creating an image or other image-related resources but
+ # without a specific image for context. The target is a
+ # dict of proposed image properties, similar to the
+ # dict-like interface the ImageTarget provides over
+ # a real Image object, with specific keys.
+ target = {'project_id': image.get('owner', context.project_id),
+ 'owner': image.get('owner', context.project_id),
+ 'visibility': image.get('visibility', 'private')}
+ else:
+ target = policy.ImageTarget(image)
+ super(ImageAPIPolicy, self).__init__(context, target, enforcer)
+
+ @property
+ def is_created(self):
+ """Signal whether the image actually exists or not.
+
+ False if the image is only being proposed by a create operation,
+ True if it has already been created.
+ """
+ return not isinstance(self._image, dict)
def _enforce(self, rule_name):
"""Translate Forbidden->NotFound for images."""
try:
super(ImageAPIPolicy, self)._enforce(rule_name)
except webob.exc.HTTPForbidden:
+ # If we are checking image policy before creating an
+ # image, or without a specific image for context, then we
+ # do not need to potentially hide the presence of anything
+ # based on visibility, so re-raise immediately.
+ if not self.is_created:
+ raise
+
# If we are checking get_image, then Forbidden means the
# user cannot see this image, so raise NotFound. If we are
# checking anything else and get Forbidden, then raise
@@ -143,6 +202,25 @@ class ImageAPIPolicy(APIPolicyBase):
def get_image_location(self):
self._enforce('get_image_location')
+ def add_image(self):
+ try:
+ self._enforce('add_image')
+ except webob.exc.HTTPForbidden:
+ # NOTE(danms): If we fail add_image because the owner is
+ # different, alter the message to be informative and
+ # in-line with the current message users have been getting
+ # in the past.
+ if self._target['owner'] != self._context.project_id:
+ msg = _("You are not permitted to create images "
+ "owned by '%s'" % self._target['owner'])
+ raise webob.exc.HTTPForbidden(msg)
+ else:
+ raise
+ if 'visibility' in self._target:
+ self._enforce_visibility(self._target['visibility'])
+ if not CONF.enforce_secure_rbac:
+ check_admin_or_same_owner(self._context, self._target)
+
def get_image(self):
self._enforce('get_image')
diff --git a/glance/policies/base.py b/glance/policies/base.py
index 8907e3f9b..5bc1ac903 100644
--- a/glance/policies/base.py
+++ b/glance/policies/base.py
@@ -69,6 +69,9 @@ ADMIN_OR_PROJECT_MEMBER_DOWNLOAD_IMAGE = (
f'role:admin or '
f'({PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC_OR_SHARED})'
)
+ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE = (
+ f'role:admin or ({PROJECT_MEMBER} and project_id:%(owner)s)'
+)
ADMIN_OR_SHARED_MEMBER = (
diff --git a/glance/policies/image.py b/glance/policies/image.py
index 9d2f91344..7db4e139f 100644
--- a/glance/policies/image.py
+++ b/glance/policies/image.py
@@ -23,7 +23,7 @@ The image API now supports roles.
image_policies = [
policy.DocumentedRuleDefault(
name="add_image",
- check_str=base.ADMIN_OR_PROJECT_MEMBER,
+ check_str=base.ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE,
scope_types=['system', 'project'],
description='Create new image',
operations=[
diff --git a/glance/tests/functional/v2/test_images_api_policy.py b/glance/tests/functional/v2/test_images_api_policy.py
index 7d02cd200..0cfa3bb3b 100644
--- a/glance/tests/functional/v2/test_images_api_policy.py
+++ b/glance/tests/functional/v2/test_images_api_policy.py
@@ -206,6 +206,36 @@ class TestImagesPolicy(functional.SynchronousAPIBase):
resp = self.api_get('/v2/images/%s' % image_id)
self.assertEqual(404, resp.status_code)
+ def test_image_create(self):
+ self.start_server()
+
+ # Make sure we can create an image
+ self.assertEqual(201, self._create().status_code)
+
+ # Now disable add_image and make sure we get 403
+ self.set_policy_rules({'add_image': '!'})
+
+ self.assertEqual(403, self._create().status_code)
+
+ def test_image_create_by_another(self):
+ self.start_server()
+
+ # NOTE(danms): There is no policy override in this test,
+ # specifically to test that the defaults (for rbac and
+ # non-rbac) properly catch the attempt by a non-admin to
+ # create an image owned by someone else.
+
+ image = {'name': 'foo',
+ 'container_format': 'bare',
+ 'disk_format': 'raw',
+ 'owner': 'someoneelse'}
+ resp = self.api_post('/v2/images',
+ json=image,
+ headers={'X-Roles': 'member'})
+ # Make sure we get the expected owner-specific error message
+ self.assertIn("You are not permitted to create images "
+ "owned by 'someoneelse'", resp.text)
+
def test_image_delete(self):
self.start_server()
diff --git a/glance/tests/unit/v2/test_v2_policy.py b/glance/tests/unit/v2/test_v2_policy.py
index 205d2ae70..80b6fab2c 100644
--- a/glance/tests/unit/v2/test_v2_policy.py
+++ b/glance/tests/unit/v2/test_v2_policy.py
@@ -223,6 +223,63 @@ class APIImagePolicy(APIPolicyBase):
'get_images',
mock.ANY)
+ def test_add_image(self):
+ generic_target = {'project_id': self.context.project_id,
+ 'owner': self.context.project_id,
+ 'visibility': 'private'}
+ self.policy = policy.ImageAPIPolicy(self.context, {},
+ enforcer=self.enforcer)
+ self.policy.add_image()
+ self.enforcer.enforce.assert_called_once_with(self.context,
+ 'add_image',
+ generic_target)
+
+ def test_add_image_falls_back_to_legacy(self):
+ self.config(enforce_secure_rbac=False)
+
+ self.context.is_admin = False
+ self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'},
+ enforcer=self.enforcer)
+ self.assertRaises(exception.Forbidden, self.policy.add_image)
+
+ # Make sure we're calling the legacy handler if secure_rbac is False
+ with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m:
+ self.policy.add_image()
+ m.assert_called_once_with(self.context, {'project_id': 'else',
+ 'owner': 'else',
+ 'visibility': 'private'})
+
+ # Make sure we are not calling the legacy handler if
+ # secure_rbac is being used. We won't fail the check because
+ # our enforcer is a mock, just make sure we don't call that handler.
+ self.config(enforce_secure_rbac=True)
+ with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m:
+ self.policy.add_image()
+ m.assert_not_called()
+
+ def test_add_image_translates_owner_failure(self):
+ self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'},
+ enforcer=self.enforcer)
+ # Make sure add_image works with no exception
+ self.policy.add_image()
+
+ # Make sure we don't get in the way of any other exceptions
+ self.enforcer.enforce.side_effect = exception.Duplicate
+ self.assertRaises(exception.Duplicate, self.policy.add_image)
+
+ # If the exception is HTTPForbidden and the owner differs,
+ # make sure we get the proper message translation
+ self.enforcer.enforce.side_effect = webob.exc.HTTPForbidden('original')
+ exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image)
+ self.assertIn('You are not permitted to create images owned by',
+ str(exc))
+
+ # If the owner does not differ, make sure we get the original reason
+ self.policy = policy.ImageAPIPolicy(self.context, {},
+ enforcer=self.enforcer)
+ exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image)
+ self.assertIn('original', str(exc))
+
def test_delete_image(self):
self.policy.delete_image()
self.enforcer.enforce.assert_called_once_with(self.context,