summaryrefslogtreecommitdiff
path: root/glance/tests/functional/v2/test_metadef_tags.py
diff options
context:
space:
mode:
Diffstat (limited to 'glance/tests/functional/v2/test_metadef_tags.py')
-rw-r--r--glance/tests/functional/v2/test_metadef_tags.py186
1 files changed, 183 insertions, 3 deletions
diff --git a/glance/tests/functional/v2/test_metadef_tags.py b/glance/tests/functional/v2/test_metadef_tags.py
index 520a1ce13..cac6b639d 100644
--- a/glance/tests/functional/v2/test_metadef_tags.py
+++ b/glance/tests/functional/v2/test_metadef_tags.py
@@ -13,15 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
-
from oslo_serialization import jsonutils
+from oslo_utils.fixture import uuidsentinel as uuids
import requests
from six.moves import http_client as http
from glance.tests import functional
-TENANT1 = str(uuid.uuid4())
+TENANT1 = uuids.owner1
+TENANT2 = uuids.owner2
class TestMetadefTags(functional.FunctionalTest):
@@ -176,3 +176,183 @@ class TestMetadefTags(functional.FunctionalTest):
self.assertEqual(http.OK, response.status_code)
tags = jsonutils.loads(response.text)['tags']
self.assertEqual(3, len(tags))
+
+ def _create_namespace(self, path, headers, data):
+ response = requests.post(path, headers=headers, json=data)
+ self.assertEqual(http.CREATED, response.status_code)
+
+ return response.json()
+
+ def _create_tags(self, namespaces):
+ tags = []
+ for namespace in namespaces:
+ headers = self._headers({'X-Tenant-Id': namespace['owner']})
+ tag_name = "tag_of_%s" % (namespace['namespace'])
+ path = self._url('/v2/metadefs/namespaces/%s/tags/%s' %
+ (namespace['namespace'], tag_name))
+ response = requests.post(path, headers=headers)
+ self.assertEqual(http.CREATED, response.status_code)
+ tag_metadata = response.json()
+ metadef_tags = dict()
+ metadef_tags[namespace['namespace']] = tag_metadata['name']
+ tags.append(metadef_tags)
+
+ return tags
+
+ def _update_tags(self, path, headers, data):
+ # The tag should be mutable
+ response = requests.put(path, headers=headers, json=data)
+ self.assertEqual(http.OK, response.status_code, response.text)
+ # Returned metadata_tag should reflect the changes
+ metadata_tag = response.json()
+ self.assertEqual(data['name'], metadata_tag['name'])
+
+ # Updates should persist across requests
+ response = requests.get(path, headers=self._headers())
+ self.assertEqual(http.OK, response.status_code)
+ self.assertEqual(data['name'], metadata_tag['name'])
+
+ def test_role_base_metadata_tags_lifecycle(self):
+ # Create public and private namespaces for tenant1 and tenant2
+ path = self._url('/v2/metadefs/namespaces')
+ headers = self._headers({'content-type': 'application/json'})
+ tenant1_namespaces = []
+ tenant2_namespaces = []
+ for tenant in [TENANT1, TENANT2]:
+ headers['X-Tenant-Id'] = tenant
+ for visibility in ['public', 'private']:
+ data = {
+ "namespace": "%s_%s_namespace" % (tenant, visibility),
+ "display_name": "My User Friendly Namespace",
+ "description": "My description",
+ "visibility": visibility,
+ "owner": tenant
+ }
+ namespace = self._create_namespace(path, headers, data)
+ if tenant == TENANT1:
+ tenant1_namespaces.append(namespace)
+ else:
+ tenant2_namespaces.append(namespace)
+
+ # Create a metadef tags for each namespace created above
+ tenant1_tags = self._create_tags(tenant1_namespaces)
+ tenant2_tags = self._create_tags(tenant2_namespaces)
+
+ def _check_tag_access(tags, tenant):
+ headers = self._headers({'content-type': 'application/json',
+ 'X-Tenant-Id': tenant,
+ 'X-Roles': 'reader,member'})
+ for tag in tags:
+ for namespace, tag_name in tag.items():
+ path = self._url('/v2/metadefs/namespaces/%s/tags/%s' %
+ (namespace, tag_name))
+ response = requests.get(path, headers=headers)
+ if namespace.split('_')[1] == 'public':
+ expected = http.OK
+ else:
+ expected = http.NOT_FOUND
+
+ # Make sure we can see all public and own private tags,
+ # but not the private tags of other tenant's
+ self.assertEqual(expected, response.status_code)
+
+ # Make sure the same holds for listing
+ path = self._url(
+ '/v2/metadefs/namespaces/%s/tags' % namespace)
+ response = requests.get(path, headers=headers)
+ self.assertEqual(expected, response.status_code)
+ if expected == http.OK:
+ resp_props = response.json()['tags']
+ self.assertEqual(
+ sorted(tag.values()),
+ sorted([x['name']
+ for x in resp_props]))
+
+ # Check Tenant 1 can access tags of all public namespace
+ # and cannot access tags of private namespace of Tenant 2
+ _check_tag_access(tenant2_tags, TENANT1)
+
+ # Check Tenant 2 can access tags of public namespace and
+ # cannot access tags of private namespace of Tenant 1
+ _check_tag_access(tenant1_tags, TENANT2)
+
+ # Update tags with admin and non admin role
+ total_tags = tenant1_tags + tenant2_tags
+ for tag in total_tags:
+ for namespace, tag_name in tag.items():
+ data = {
+ "name": tag_name}
+ path = self._url('/v2/metadefs/namespaces/%s/tags/%s' %
+ (namespace, tag_name))
+
+ # Update tag should fail with non admin role
+ headers['X-Roles'] = "reader,member"
+ response = requests.put(path, headers=headers, json=data)
+ self.assertEqual(http.FORBIDDEN, response.status_code)
+
+ # Should work with admin role
+ headers = self._headers({
+ 'X-Tenant-Id': namespace.split('_')[0]})
+ self._update_tags(path, headers, data)
+
+ # Delete tags should not be allowed to non admin role
+ for tag in total_tags:
+ for namespace, tag_name in tag.items():
+ path = self._url('/v2/metadefs/namespaces/%s/tags/%s' %
+ (namespace, tag_name))
+ response = requests.delete(
+ path, headers=self._headers({
+ 'X-Roles': 'reader,member',
+ 'X-Tenant-Id': namespace.split('_')[0]
+ }))
+ self.assertEqual(http.FORBIDDEN, response.status_code)
+
+ # Delete all metadef tags
+ headers = self._headers()
+ for tag in total_tags:
+ for namespace, tag_name in tag.items():
+ path = self._url('/v2/metadefs/namespaces/%s/tags/%s' %
+ (namespace, tag_name))
+ response = requests.delete(path, headers=headers)
+ self.assertEqual(http.NO_CONTENT, response.status_code)
+
+ # Deleted tags should not be exist
+ response = requests.get(path, headers=headers)
+ self.assertEqual(http.NOT_FOUND, response.status_code)
+
+ # Create multiple tags should not be allowed to non admin role
+ headers = self._headers({'content-type': 'application/json',
+ 'X-Roles': 'reader,member'})
+ data = {
+ "tags": [{"name": "tag1"}, {"name": "tag2"}, {"name": "tag3"}]
+ }
+ for namespace in tenant1_namespaces:
+ path = self._url('/v2/metadefs/namespaces/%s/tags' %
+ (namespace['namespace']))
+ response = requests.post(path, headers=headers, json=data)
+ self.assertEqual(http.FORBIDDEN, response.status_code)
+
+ # Create multiple tags.
+ headers = self._headers({'content-type': 'application/json'})
+ for namespace in tenant1_namespaces:
+ path = self._url('/v2/metadefs/namespaces/%s/tags' %
+ (namespace['namespace']))
+ response = requests.post(path, headers=headers, json=data)
+ self.assertEqual(http.CREATED, response.status_code)
+
+ # Delete multiple tags should not be allowed with non admin role
+ headers = self._headers({'content-type': 'application/json',
+ 'X-Roles': 'reader,member'})
+ for namespace in tenant1_namespaces:
+ path = self._url('/v2/metadefs/namespaces/%s/tags' %
+ (namespace['namespace']))
+ response = requests.delete(path, headers=headers)
+ self.assertEqual(http.FORBIDDEN, response.status_code)
+
+ # Delete multiple tags created above created tags
+ headers = self._headers()
+ for namespace in tenant1_namespaces:
+ path = self._url('/v2/metadefs/namespaces/%s/tags' %
+ (namespace['namespace']))
+ response = requests.delete(path, headers=headers)
+ self.assertEqual(http.NO_CONTENT, response.status_code)