diff options
author | Zuul <zuul@review.opendev.org> | 2021-08-24 01:05:48 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2021-08-24 01:05:48 +0000 |
commit | 646e15da442a51fd13a9bb4864d04295287c0680 (patch) | |
tree | 4fe80ce928919a4d0e6ff3d769402d5c7b70b3b2 /glance | |
parent | d23c593303de00c960833179d80264f6af474e31 (diff) | |
parent | de560b19e6bcc6e3eb7bbc94b42710c4ca8819ce (diff) | |
download | glance-646e15da442a51fd13a9bb4864d04295287c0680.tar.gz |
Merge "Check add_image policy in the API"
Diffstat (limited to 'glance')
-rw-r--r-- | glance/api/v2/images.py | 12 | ||||
-rw-r--r-- | glance/api/v2/policy.py | 84 | ||||
-rw-r--r-- | glance/policies/base.py | 3 | ||||
-rw-r--r-- | glance/policies/image.py | 2 | ||||
-rw-r--r-- | glance/tests/functional/v2/test_images_api_policy.py | 30 | ||||
-rw-r--r-- | glance/tests/unit/v2/test_v2_policy.py | 57 |
6 files changed, 182 insertions, 6 deletions
diff --git a/glance/api/v2/images.py b/glance/api/v2/images.py index bd6545109..8562adad9 100644 --- a/glance/api/v2/images.py +++ b/glance/api/v2/images.py @@ -92,9 +92,17 @@ class ImagesController(object): @utils.mutating def create(self, req, image, extra_properties, tags): - image_factory = self.gateway.get_image_factory(req.context) - image_repo = self.gateway.get_repo(req.context) + image_factory = self.gateway.get_image_factory( + req.context, authorization_layer=False) + image_repo = self.gateway.get_repo(req.context, + authorization_layer=False) try: + if 'owner' not in image: + image['owner'] = req.context.project_id + + api_policy.ImageAPIPolicy(req.context, image, + self.policy).add_image() + ks_quota.enforce_image_count_total(req.context, req.context.owner) image = image_factory.new_image(extra_properties=extra_properties, tags=tags, **image) diff --git a/glance/api/v2/policy.py b/glance/api/v2/policy.py index 61b73e685..6babef154 100644 --- a/glance/api/v2/policy.py +++ b/glance/api/v2/policy.py @@ -53,6 +53,28 @@ def check_is_image_mutable(context, image): raise exception.Forbidden(_('You do not own this image')) +def check_admin_or_same_owner(context, properties): + """Check that legacy behavior on create with owner is preserved. + + Legacy behavior requires a static check that owner is not + inconsistent with the context, unless the caller is an + admin. Enforce that here, if needed. + + :param context: A RequestContext + :param properties: The properties being used to create the image, which may + contain an owner + :raises: exception.Forbidden if the context is not an admin and owner is + set to something other than the context's project + """ + if context.is_admin: + return + + if context.project_id != properties.get('owner', context.project_id): + msg = _("You are not permitted to create images " + "owned by '%s'.") + raise exception.Forbidden(msg % properties['owner']) + + class APIPolicyBase(object): def __init__(self, context, target=None, enforcer=None): self._context = context @@ -84,16 +106,53 @@ class APIPolicyBase(object): class ImageAPIPolicy(APIPolicyBase): def __init__(self, context, image, enforcer=None): - super(ImageAPIPolicy, self).__init__(context, - policy.ImageTarget(image), - enforcer) + """Image API policy module. + + :param context: The RequestContext + :param image: The ImageProxy object in question, or a dict of image + properties if no image is yet created or needed for + authorization context. + :param enforcer: The policy.Enforcer object to use for enforcement + operations. If not provided (or None), the default + enforcer will be selected. + """ self._image = image + if not self.is_created: + # NOTE(danms): If we are being called with a dict of image + # properties then we are testing policies that involve + # creating an image or other image-related resources but + # without a specific image for context. The target is a + # dict of proposed image properties, similar to the + # dict-like interface the ImageTarget provides over + # a real Image object, with specific keys. + target = {'project_id': image.get('owner', context.project_id), + 'owner': image.get('owner', context.project_id), + 'visibility': image.get('visibility', 'private')} + else: + target = policy.ImageTarget(image) + super(ImageAPIPolicy, self).__init__(context, target, enforcer) + + @property + def is_created(self): + """Signal whether the image actually exists or not. + + False if the image is only being proposed by a create operation, + True if it has already been created. + """ + return not isinstance(self._image, dict) def _enforce(self, rule_name): """Translate Forbidden->NotFound for images.""" try: super(ImageAPIPolicy, self)._enforce(rule_name) except webob.exc.HTTPForbidden: + # If we are checking image policy before creating an + # image, or without a specific image for context, then we + # do not need to potentially hide the presence of anything + # based on visibility, so re-raise immediately. + if not self.is_created: + raise + # If we are checking get_image, then Forbidden means the # user cannot see this image, so raise NotFound. If we are # checking anything else and get Forbidden, then raise @@ -143,6 +202,25 @@ class ImageAPIPolicy(APIPolicyBase): def get_image_location(self): self._enforce('get_image_location') + def add_image(self): + try: + self._enforce('add_image') + except webob.exc.HTTPForbidden: + # NOTE(danms): If we fail add_image because the owner is + # different, alter the message to be informative and + # in-line with the current message users have been getting + # in the past. + if self._target['owner'] != self._context.project_id: + msg = _("You are not permitted to create images " + "owned by '%s'" % self._target['owner']) + raise webob.exc.HTTPForbidden(msg) + else: + raise + if 'visibility' in self._target: + self._enforce_visibility(self._target['visibility']) + if not CONF.enforce_secure_rbac: + check_admin_or_same_owner(self._context, self._target) + def get_image(self): self._enforce('get_image') diff --git a/glance/policies/base.py b/glance/policies/base.py index 8907e3f9b..5bc1ac903 100644 --- a/glance/policies/base.py +++ b/glance/policies/base.py @@ -69,6 +69,9 @@ ADMIN_OR_PROJECT_MEMBER_DOWNLOAD_IMAGE = ( f'role:admin or ' f'({PROJECT_MEMBER_OR_IMAGE_MEMBER_OR_COMMUNITY_OR_PUBLIC_OR_SHARED})' ) +ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE = ( + f'role:admin or ({PROJECT_MEMBER} and project_id:%(owner)s)' +) ADMIN_OR_SHARED_MEMBER = ( diff --git a/glance/policies/image.py b/glance/policies/image.py index 9d2f91344..7db4e139f 100644 --- a/glance/policies/image.py +++ b/glance/policies/image.py @@ -23,7 +23,7 @@ The image API now supports roles. image_policies = [ policy.DocumentedRuleDefault( name="add_image", - check_str=base.ADMIN_OR_PROJECT_MEMBER, + check_str=base.ADMIN_OR_PROJECT_MEMBER_CREATE_IMAGE, scope_types=['system', 'project'], description='Create new image', operations=[ diff --git a/glance/tests/functional/v2/test_images_api_policy.py b/glance/tests/functional/v2/test_images_api_policy.py index 7d02cd200..0cfa3bb3b 100644 --- a/glance/tests/functional/v2/test_images_api_policy.py +++ b/glance/tests/functional/v2/test_images_api_policy.py @@ -206,6 +206,36 @@ class TestImagesPolicy(functional.SynchronousAPIBase): resp = self.api_get('/v2/images/%s' % image_id) self.assertEqual(404, resp.status_code) + def test_image_create(self): + self.start_server() + + # Make sure we can create an image + self.assertEqual(201, self._create().status_code) + + # Now disable add_image and make sure we get 403 + self.set_policy_rules({'add_image': '!'}) + + self.assertEqual(403, self._create().status_code) + + def test_image_create_by_another(self): + self.start_server() + + # NOTE(danms): There is no policy override in this test, + # specifically to test that the defaults (for rbac and + # non-rbac) properly catch the attempt by a non-admin to + # create an image owned by someone else. + + image = {'name': 'foo', + 'container_format': 'bare', + 'disk_format': 'raw', + 'owner': 'someoneelse'} + resp = self.api_post('/v2/images', + json=image, + headers={'X-Roles': 'member'}) + # Make sure we get the expected owner-specific error message + self.assertIn("You are not permitted to create images " + "owned by 'someoneelse'", resp.text) + def test_image_delete(self): self.start_server() diff --git a/glance/tests/unit/v2/test_v2_policy.py b/glance/tests/unit/v2/test_v2_policy.py index 205d2ae70..80b6fab2c 100644 --- a/glance/tests/unit/v2/test_v2_policy.py +++ b/glance/tests/unit/v2/test_v2_policy.py @@ -223,6 +223,63 @@ class APIImagePolicy(APIPolicyBase): 'get_images', mock.ANY) + def test_add_image(self): + generic_target = {'project_id': self.context.project_id, + 'owner': self.context.project_id, + 'visibility': 'private'} + self.policy = policy.ImageAPIPolicy(self.context, {}, + enforcer=self.enforcer) + self.policy.add_image() + self.enforcer.enforce.assert_called_once_with(self.context, + 'add_image', + generic_target) + + def test_add_image_falls_back_to_legacy(self): + self.config(enforce_secure_rbac=False) + + self.context.is_admin = False + self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'}, + enforcer=self.enforcer) + self.assertRaises(exception.Forbidden, self.policy.add_image) + + # Make sure we're calling the legacy handler if secure_rbac is False + with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m: + self.policy.add_image() + m.assert_called_once_with(self.context, {'project_id': 'else', + 'owner': 'else', + 'visibility': 'private'}) + + # Make sure we are not calling the legacy handler if + # secure_rbac is being used. We won't fail the check because + # our enforcer is a mock, just make sure we don't call that handler. + self.config(enforce_secure_rbac=True) + with mock.patch('glance.api.v2.policy.check_admin_or_same_owner') as m: + self.policy.add_image() + m.assert_not_called() + + def test_add_image_translates_owner_failure(self): + self.policy = policy.ImageAPIPolicy(self.context, {'owner': 'else'}, + enforcer=self.enforcer) + # Make sure add_image works with no exception + self.policy.add_image() + + # Make sure we don't get in the way of any other exceptions + self.enforcer.enforce.side_effect = exception.Duplicate + self.assertRaises(exception.Duplicate, self.policy.add_image) + + # If the exception is HTTPForbidden and the owner differs, + # make sure we get the proper message translation + self.enforcer.enforce.side_effect = webob.exc.HTTPForbidden('original') + exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image) + self.assertIn('You are not permitted to create images owned by', + str(exc)) + + # If the owner does not differ, make sure we get the original reason + self.policy = policy.ImageAPIPolicy(self.context, {}, + enforcer=self.enforcer) + exc = self.assertRaises(webob.exc.HTTPForbidden, self.policy.add_image) + self.assertIn('original', str(exc)) + def test_delete_image(self): self.policy.delete_image() self.enforcer.enforce.assert_called_once_with(self.context, |