summaryrefslogtreecommitdiff
path: root/saharaclient/api
diff options
context:
space:
mode:
Diffstat (limited to 'saharaclient/api')
-rw-r--r--saharaclient/api/__init__.py0
-rw-r--r--saharaclient/api/base.py155
-rw-r--r--saharaclient/api/client.py107
-rw-r--r--saharaclient/api/cluster_templates.py70
-rw-r--r--saharaclient/api/clusters.py74
-rw-r--r--saharaclient/api/data_sources.py47
-rw-r--r--saharaclient/api/helpers.py76
-rw-r--r--saharaclient/api/httpclient.py40
-rw-r--r--saharaclient/api/images.py68
-rw-r--r--saharaclient/api/job_binaries.py50
-rw-r--r--saharaclient/api/job_binary_internals.py38
-rw-r--r--saharaclient/api/job_executions.py50
-rw-r--r--saharaclient/api/jobs.py47
-rw-r--r--saharaclient/api/node_group_templates.py84
-rw-r--r--saharaclient/api/parameters.py26
-rw-r--r--saharaclient/api/plugins.py56
-rw-r--r--saharaclient/api/shell.py778
17 files changed, 1766 insertions, 0 deletions
diff --git a/saharaclient/api/__init__.py b/saharaclient/api/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/saharaclient/api/__init__.py
diff --git a/saharaclient/api/base.py b/saharaclient/api/base.py
new file mode 100644
index 0000000..cf4c493
--- /dev/null
+++ b/saharaclient/api/base.py
@@ -0,0 +1,155 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import six
+
+LOG = logging.getLogger(__name__)
+
+
+class Resource(object):
+ resource_name = 'Something'
+ defaults = {}
+
+ def __init__(self, manager, info):
+ self.manager = manager
+ info = info.copy()
+ self._info = info
+ self._set_defaults(info)
+ self._add_details(info)
+
+ def _set_defaults(self, info):
+ for name, value in six.iteritems(self.defaults):
+ if name not in info:
+ info[name] = value
+
+ def _add_details(self, info):
+ for (k, v) in six.iteritems(info):
+ try:
+ setattr(self, k, v)
+ self._info[k] = v
+ except AttributeError:
+ # In this case we already defined the attribute on the class
+ pass
+
+ def __str__(self):
+ return '%s %s' % (self.resource_name, str(self._info))
+
+
+def _check_items(obj, searches):
+ try:
+ return all(getattr(obj, attr) == value for (attr, value) in searches)
+ except AttributeError:
+ return False
+
+
+class ResourceManager(object):
+ resource_class = None
+
+ def __init__(self, api):
+ self.api = api
+
+ def find(self, **kwargs):
+ return [i for i in self.list() if _check_items(i, kwargs.items())]
+
+ def _copy_if_defined(self, data, **kwargs):
+ for var_name, var_value in six.iteritems(kwargs):
+ if var_value is not None:
+ data[var_name] = var_value
+
+ def _create(self, url, data, response_key=None, dump_json=True):
+ if dump_json:
+ data = json.dumps(data)
+ resp = self.api.client.post(url, data)
+
+ if resp.status_code != 202:
+ self._raise_api_exception(resp)
+
+ if response_key is not None:
+ data = get_json(resp)[response_key]
+ else:
+ data = get_json(resp)
+ return self.resource_class(self, data)
+
+ def _update(self, url, data, response_key=None, dump_json=True):
+ if dump_json:
+ data = json.dumps(data)
+ resp = self.api.client.put(url, data)
+
+ if resp.status_code != 202:
+ self._raise_api_exception(resp)
+ if response_key is not None:
+ data = get_json(resp)[response_key]
+ else:
+ data = get_json(resp)
+ return self.resource_class(self, data)
+
+ def _list(self, url, response_key):
+ resp = self.api.client.get(url)
+ if resp.status_code == 200:
+ data = get_json(resp)[response_key]
+
+ return [self.resource_class(self, res)
+ for res in data]
+ else:
+ self._raise_api_exception(resp)
+
+ def _get(self, url, response_key=None):
+ resp = self.api.client.get(url)
+
+ if resp.status_code == 200:
+ if response_key is not None:
+ data = get_json(resp)[response_key]
+ else:
+ data = get_json(resp)
+ return self.resource_class(self, data)
+ else:
+ self._raise_api_exception(resp)
+
+ def _delete(self, url):
+ resp = self.api.client.delete(url)
+
+ if resp.status_code != 204:
+ self._raise_api_exception(resp)
+
+ def _plurify_resource_name(self):
+ return self.resource_class.resource_name + 's'
+
+ def _raise_api_exception(self, resp):
+ error_data = get_json(resp)
+ raise APIException(error_code=error_data.get("error_code"),
+ error_name=error_data.get("error_name"),
+ error_message=error_data.get("error_message"))
+
+
+def get_json(response):
+ """This method provided backward compatibility with old versions
+ of requests library
+
+ """
+ json_field_or_function = getattr(response, 'json', None)
+ if callable(json_field_or_function):
+ return response.json()
+ else:
+ return json.loads(response.content)
+
+
+class APIException(Exception):
+ def __init__(self, error_code=None, error_name=None, error_message=None):
+ super(APIException, self).__init__(error_message)
+ self.error_code = error_code
+ self.error_name = error_name
+ self.error_message = error_message
diff --git a/saharaclient/api/client.py b/saharaclient/api/client.py
new file mode 100644
index 0000000..bbb701b
--- /dev/null
+++ b/saharaclient/api/client.py
@@ -0,0 +1,107 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from keystoneclient.v2_0 import client as keystone_client_v2
+from keystoneclient.v3 import client as keystone_client_v3
+
+from saharaclient.api import cluster_templates
+from saharaclient.api import clusters
+from saharaclient.api import data_sources
+from saharaclient.api import httpclient
+from saharaclient.api import images
+from saharaclient.api import job_binaries
+from saharaclient.api import job_binary_internals
+from saharaclient.api import job_executions
+from saharaclient.api import jobs
+from saharaclient.api import node_group_templates
+from saharaclient.api import plugins
+
+
+class Client(object):
+ def __init__(self, username=None, api_key=None, project_id=None,
+ project_name=None, auth_url=None, savanna_url=None,
+ endpoint_type='publicURL', service_type='data_processing',
+ input_auth_token=None):
+
+ if not input_auth_token:
+ keystone = self.get_keystone_client(username=username,
+ api_key=api_key,
+ auth_url=auth_url,
+ project_id=project_id,
+ project_name=project_name)
+ input_auth_token = keystone.auth_token
+ if not input_auth_token:
+ raise RuntimeError("Not Authorized")
+
+ savanna_catalog_url = savanna_url
+ if not savanna_url:
+ keystone = self.get_keystone_client(username=username,
+ api_key=api_key,
+ auth_url=auth_url,
+ token=input_auth_token,
+ project_id=project_id,
+ project_name=project_name)
+ catalog = keystone.service_catalog.get_endpoints(service_type)
+ if service_type in catalog:
+ for e_type, endpoint in catalog.get(service_type)[0].items():
+ if str(e_type).lower() == str(endpoint_type).lower():
+ savanna_catalog_url = endpoint
+ break
+ if not savanna_catalog_url:
+ raise RuntimeError("Could not find Sahara endpoint in catalog")
+
+ self.client = httpclient.HTTPClient(savanna_catalog_url,
+ input_auth_token)
+
+ self.clusters = clusters.ClusterManager(self)
+ self.cluster_templates = cluster_templates.ClusterTemplateManager(self)
+ self.node_group_templates = (node_group_templates.
+ NodeGroupTemplateManager(self))
+ self.plugins = plugins.PluginManager(self)
+ self.images = images.ImageManager(self)
+
+ self.data_sources = data_sources.DataSourceManager(self)
+ self.jobs = jobs.JobsManager(self)
+ self.job_executions = job_executions.JobExecutionsManager(self)
+ self.job_binaries = job_binaries.JobBinariesManager(self)
+ self.job_binary_internals =\
+ job_binary_internals.JobBinaryInternalsManager(self)
+
+ def get_keystone_client(self, username=None, api_key=None, auth_url=None,
+ token=None, project_id=None, project_name=None):
+ if not auth_url:
+ raise RuntimeError("No auth url specified")
+ imported_client = keystone_client_v2 if "v2.0" in auth_url\
+ else keystone_client_v3
+ if not getattr(self, "keystone_client", None):
+ self.keystone_client = imported_client.Client(
+ username=username,
+ password=api_key,
+ token=token,
+ tenant_id=project_id,
+ tenant_name=project_name,
+ auth_url=auth_url,
+ endpoint=auth_url)
+
+ self.keystone_client.authenticate()
+
+ return self.keystone_client
+
+ @staticmethod
+ def get_projects_list(keystone_client):
+ if isinstance(keystone_client, keystone_client_v2.Client):
+ return keystone_client.tenants
+
+ return keystone_client.projects
diff --git a/saharaclient/api/cluster_templates.py b/saharaclient/api/cluster_templates.py
new file mode 100644
index 0000000..f9ca583
--- /dev/null
+++ b/saharaclient/api/cluster_templates.py
@@ -0,0 +1,70 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class ClusterTemplate(base.Resource):
+ resource_name = 'Cluster Template'
+
+
+class ClusterTemplateManager(base.ResourceManager):
+ resource_class = ClusterTemplate
+
+ def _assign_field(self, name, plugin_name, hadoop_version,
+ description=None, cluster_configs=None, node_groups=None,
+ anti_affinity=None, net_id=None):
+ data = {
+ 'name': name,
+ 'plugin_name': plugin_name,
+ 'hadoop_version': hadoop_version,
+ }
+
+ self._copy_if_defined(data,
+ description=description,
+ cluster_configs=cluster_configs,
+ node_groups=node_groups,
+ anti_affinity=anti_affinity,
+ neutron_management_network=net_id)
+ return data
+
+ def create(self, name, plugin_name, hadoop_version, description=None,
+ cluster_configs=None, node_groups=None, anti_affinity=None,
+ net_id=None):
+ data = self._assign_field(name, plugin_name, hadoop_version,
+ description, cluster_configs, node_groups,
+ anti_affinity, net_id)
+
+ return self._create('/cluster-templates', data, 'cluster_template')
+
+ def update(self, cluster_template_id, name, plugin_name, hadoop_version,
+ description=None, cluster_configs=None, node_groups=None,
+ anti_affinity=None, net_id=None):
+ data = self._assign_field(name, plugin_name, hadoop_version,
+ description, cluster_configs, node_groups,
+ anti_affinity, net_id)
+
+ return self._update('/cluster-templates/%s' % cluster_template_id,
+ data, 'cluster_template')
+
+ def list(self):
+ return self._list('/cluster-templates', 'cluster_templates')
+
+ def get(self, cluster_template_id):
+ return self._get('/cluster-templates/%s' % cluster_template_id,
+ 'cluster_template')
+
+ def delete(self, cluster_template_id):
+ self._delete('/cluster-templates/%s' % cluster_template_id)
diff --git a/saharaclient/api/clusters.py b/saharaclient/api/clusters.py
new file mode 100644
index 0000000..ec9a822
--- /dev/null
+++ b/saharaclient/api/clusters.py
@@ -0,0 +1,74 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from saharaclient.api import base
+
+
+class Cluster(base.Resource):
+ resource_name = 'Cluster'
+
+
+class ClusterManager(base.ResourceManager):
+ resource_class = Cluster
+
+ def _assert_variables(self, **kwargs):
+ for var_name, var_value in six.iteritems(kwargs):
+ if var_value is None:
+ raise base.APIException('Cluster is missing field "%s"' %
+ var_name)
+
+ def create(self, name, plugin_name, hadoop_version,
+ cluster_template_id=None, default_image_id=None,
+ is_transient=None, description=None, cluster_configs=None,
+ node_groups=None, user_keypair_id=None,
+ anti_affinity=None, net_id=None):
+
+ data = {
+ 'name': name,
+ 'plugin_name': plugin_name,
+ 'hadoop_version': hadoop_version,
+ }
+
+ if cluster_template_id is None:
+ self._assert_variables(default_image_id=default_image_id,
+ cluster_configs=cluster_configs,
+ node_groups=node_groups)
+
+ self._copy_if_defined(data,
+ cluster_template_id=cluster_template_id,
+ is_transient=is_transient,
+ default_image_id=default_image_id,
+ description=description,
+ cluster_configs=cluster_configs,
+ node_groups=node_groups,
+ user_keypair_id=user_keypair_id,
+ anti_affinity=anti_affinity,
+ neutron_management_network=net_id)
+
+ return self._create('/clusters', data, 'cluster')
+
+ def scale(self, cluster_id, scale_object):
+ return self._update('/clusters/%s' % cluster_id, scale_object)
+
+ def list(self):
+ return self._list('/clusters', 'clusters')
+
+ def get(self, cluster_id):
+ return self._get('/clusters/%s' % cluster_id, 'cluster')
+
+ def delete(self, cluster_id):
+ self._delete('/clusters/%s' % cluster_id)
diff --git a/saharaclient/api/data_sources.py b/saharaclient/api/data_sources.py
new file mode 100644
index 0000000..7ba0059
--- /dev/null
+++ b/saharaclient/api/data_sources.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class DataSources(base.Resource):
+ resource_name = 'Data Source'
+
+
+class DataSourceManager(base.ResourceManager):
+ resource_class = DataSources
+
+ def create(self, name, description, data_source_type,
+ url, credential_user=None, credential_pass=None):
+ data = {
+ 'name': name,
+ 'description': description,
+ 'type': data_source_type,
+ 'url': url,
+ 'credentials': {}
+ }
+ self._copy_if_defined(data['credentials'],
+ user=credential_user,
+ password=credential_pass)
+ return self._create('/data-sources', data, 'data_source')
+
+ def list(self):
+ return self._list('/data-sources', 'data_sources')
+
+ def get(self, data_source_id):
+ return self._get('/data-sources/%s' % data_source_id, 'data_source')
+
+ def delete(self, data_source_id):
+ self._delete('/data-sources/%s' % data_source_id)
diff --git a/saharaclient/api/helpers.py b/saharaclient/api/helpers.py
new file mode 100644
index 0000000..79495c9
--- /dev/null
+++ b/saharaclient/api/helpers.py
@@ -0,0 +1,76 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import parameters as params
+
+
+class Helpers(object):
+ def __init__(self, savanna_client):
+ self.savanna = savanna_client
+ self.plugins = self.savanna.plugins
+
+ def _get_node_processes(self, plugin):
+ processes = []
+ for proc_lst in plugin.node_processes.values():
+ processes += proc_lst
+
+ return [(proc_name, proc_name) for proc_name in processes]
+
+ def get_node_processes(self, plugin_name, hadoop_version):
+ plugin = self.plugins.get_version_details(plugin_name, hadoop_version)
+
+ return self._get_node_processes(plugin)
+
+ def _extract_parameters(self, configs, scope, applicable_target):
+ parameters = []
+ for config in configs:
+ if (config['scope'] == scope and
+ config['applicable_target'] == applicable_target):
+
+ parameters.append(params.Parameter(config))
+
+ return parameters
+
+ def get_cluster_general_configs(self, plugin_name, hadoop_version):
+ plugin = self.plugins.get_version_details(plugin_name, hadoop_version)
+
+ return self._extract_parameters(plugin.configs, 'cluster', "general")
+
+ def get_general_node_group_configs(self, plugin_name, hadoop_version):
+ plugin = self.plugins.get_version_details(plugin_name, hadoop_version)
+
+ return self._extract_parameters(plugin.configs, 'node', 'general')
+
+ def get_targeted_node_group_configs(self, plugin_name, hadoop_version):
+ plugin = self.plugins.get_version_details(plugin_name, hadoop_version)
+
+ parameters = dict()
+
+ for service in plugin.node_processes.keys():
+ parameters[service] = self._extract_parameters(plugin.configs,
+ 'node', service)
+
+ return parameters
+
+ def get_targeted_cluster_configs(self, plugin_name, hadoop_version):
+ plugin = self.plugins.get_version_details(plugin_name, hadoop_version)
+
+ parameters = dict()
+
+ for service in plugin.node_processes.keys():
+ parameters[service] = self._extract_parameters(plugin.configs,
+ 'cluster', service)
+
+ return parameters
diff --git a/saharaclient/api/httpclient.py b/saharaclient/api/httpclient.py
new file mode 100644
index 0000000..946e9c3
--- /dev/null
+++ b/saharaclient/api/httpclient.py
@@ -0,0 +1,40 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import requests
+
+
+class HTTPClient(object):
+ def __init__(self, base_url, token):
+ self.base_url = base_url
+ self.token = token
+
+ def get(self, url):
+ return requests.get(self.base_url + url,
+ headers={'x-auth-token': self.token})
+
+ def post(self, url, body):
+ return requests.post(self.base_url + url, body,
+ headers={'x-auth-token': self.token,
+ 'content-type': 'application/json'})
+
+ def put(self, url, body):
+ return requests.put(self.base_url + url, body,
+ headers={'x-auth-token': self.token,
+ 'content-type': 'application/json'})
+
+ def delete(self, url):
+ return requests.delete(self.base_url + url,
+ headers={'x-auth-token': self.token})
diff --git a/saharaclient/api/images.py b/saharaclient/api/images.py
new file mode 100644
index 0000000..a705e15
--- /dev/null
+++ b/saharaclient/api/images.py
@@ -0,0 +1,68 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from saharaclient.api import base
+
+
+class Image(base.Resource):
+ resource_name = 'Image'
+ defaults = {'description': ''}
+
+
+class ImageManager(base.ResourceManager):
+ resource_class = Image
+
+ def list(self):
+ return self._list('/images', 'images')
+
+ def get(self, id):
+ return self._get('/images/%s' % id, 'image')
+
+ def unregister_image(self, image_id):
+ self._delete('/images/%s' % image_id)
+
+ def update_image(self, image_id, user_name, desc):
+ body = {"username": user_name,
+ "description": desc}
+
+ resp = self.api.client.post('/images/%s' % image_id, json.dumps(body))
+ if resp.status_code != 202:
+ raise RuntimeError('Failed to register image %s' % image_id)
+
+ def update_tags(self, image_id, new_tags):
+ old_image = self.get(image_id)
+
+ old_tags = frozenset(old_image.tags)
+ new_tags = frozenset(new_tags)
+
+ to_add = list(new_tags - old_tags)
+ to_remove = list(old_tags - new_tags)
+
+ if len(to_add) != 0:
+ resp = self.api.client.post('/images/%s/tag' % image_id,
+ json.dumps({'tags': to_add}))
+
+ if resp.status_code != 202:
+ raise RuntimeError('Failed to add tags to image %s' % image_id)
+
+ if len(to_remove) != 0:
+ resp = self.api.client.post('/images/%s/untag' % image_id,
+ json.dumps({'tags': to_remove}))
+
+ if resp.status_code != 202:
+ raise RuntimeError('Failed to remove tags from image %s' %
+ image_id)
diff --git a/saharaclient/api/job_binaries.py b/saharaclient/api/job_binaries.py
new file mode 100644
index 0000000..a1e4b9e
--- /dev/null
+++ b/saharaclient/api/job_binaries.py
@@ -0,0 +1,50 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class JobBinaries(base.Resource):
+ resource_name = 'Job Binary'
+
+
+class JobBinariesManager(base.ResourceManager):
+ resource_class = JobBinaries
+
+ def create(self, name, url, description, extra):
+ data = {
+ "name": name,
+ "url": url,
+ "description": description,
+ "extra": extra
+ }
+
+ return self._create('/job-binaries', data, 'job_binary')
+
+ def list(self):
+ return self._list('/job-binaries', 'binaries')
+
+ def get(self, job_binary_id):
+ return self._get('/job-binaries/%s' % job_binary_id, 'job_binary')
+
+ def delete(self, job_binary_id):
+ self._delete('/job-binaries/%s' % job_binary_id)
+
+ def get_file(self, job_binary_id):
+ resp = self.api.client.get('/job-binaries/%s/data' % job_binary_id)
+
+ if resp.status_code != 200:
+ self._raise_api_exception(resp)
+ return resp.content
diff --git a/saharaclient/api/job_binary_internals.py b/saharaclient/api/job_binary_internals.py
new file mode 100644
index 0000000..b90d478
--- /dev/null
+++ b/saharaclient/api/job_binary_internals.py
@@ -0,0 +1,38 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class JobBinaryInternal(base.Resource):
+ resource_name = 'JobBinaryInternal'
+
+
+class JobBinaryInternalsManager(base.ResourceManager):
+ resource_class = JobBinaryInternal
+
+ def create(self, name, data):
+ return self._update('/job-binary-internals/%s' % name, data,
+ 'job_binary_internal', dump_json=False)
+
+ def list(self):
+ return self._list('/job-binary-internals', 'binaries')
+
+ def get(self, job_binary_id):
+ return self._get('/job-binary-internals/%s' % job_binary_id,
+ 'job_binary_internal')
+
+ def delete(self, job_binary_id):
+ self._delete('/job-binary-internals/%s' % job_binary_id)
diff --git a/saharaclient/api/job_executions.py b/saharaclient/api/job_executions.py
new file mode 100644
index 0000000..68ef4e6
--- /dev/null
+++ b/saharaclient/api/job_executions.py
@@ -0,0 +1,50 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class JobExecution(base.Resource):
+ resource_name = 'JobExecution'
+
+
+class JobExecutionsManager(base.ResourceManager):
+ resource_class = JobExecution
+
+ def list(self):
+ return self._list('/job-executions', 'job_executions')
+
+ def get(self, obj_id):
+ return self._get('/job-executions/%s' % obj_id, 'job_execution')
+
+ def delete(self, obj_id):
+ self._delete('/job-executions/%s' % obj_id)
+
+ def create(self, job_id, cluster_id, input_id, output_id, configs):
+ url = "/jobs/%s/execute" % job_id
+ data = {
+ "cluster_id": cluster_id,
+ "job_configs": configs
+ }
+
+ # Leave these out if they are null. For Java job types they
+ # are not part of the schema
+ io_ids = (("input_id", input_id),
+ ("output_id", output_id))
+ for key, value in io_ids:
+ if value is not None:
+ data.update({key: value})
+
+ return self._create(url, data, 'job_execution')
diff --git a/saharaclient/api/jobs.py b/saharaclient/api/jobs.py
new file mode 100644
index 0000000..d3231eb
--- /dev/null
+++ b/saharaclient/api/jobs.py
@@ -0,0 +1,47 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class Job(base.Resource):
+ resource_name = 'Job'
+
+
+class JobsManager(base.ResourceManager):
+ resource_class = Job
+
+ def create(self, name, type, mains, libs, description):
+ data = {
+ 'name': name,
+ 'type': type,
+ 'description': description,
+ 'mains': mains,
+ 'libs': libs
+ }
+
+ return self._create('/jobs', data, 'job')
+
+ def list(self):
+ return self._list('/jobs', 'jobs')
+
+ def get(self, job_id):
+ return self._get('/jobs/%s' % job_id, 'job')
+
+ def get_configs(self, job_type):
+ return self._get('/jobs/config-hints/%s' % job_type)
+
+ def delete(self, job_id):
+ self._delete('/jobs/%s' % job_id)
diff --git a/saharaclient/api/node_group_templates.py b/saharaclient/api/node_group_templates.py
new file mode 100644
index 0000000..f9ae36a
--- /dev/null
+++ b/saharaclient/api/node_group_templates.py
@@ -0,0 +1,84 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from saharaclient.api import base
+
+
+class NodeGroupTemplate(base.Resource):
+ resource_name = 'Node Group Template'
+
+
+class NodeGroupTemplateManager(base.ResourceManager):
+ resource_class = NodeGroupTemplate
+
+ def _assign_field(self, name, plugin_name, hadoop_version, flavor_id,
+ description=None,
+ volumes_per_node=None, volumes_size=None,
+ node_processes=None, node_configs=None,
+ floating_ip_pool=None):
+
+ data = {
+ 'name': name,
+ 'plugin_name': plugin_name,
+ 'hadoop_version': hadoop_version,
+ 'flavor_id': flavor_id,
+ 'node_processes': node_processes
+ }
+
+ self._copy_if_defined(data,
+ description=description,
+ node_configs=node_configs,
+ floating_ip_pool=floating_ip_pool)
+
+ if volumes_per_node:
+ data.update({"volumes_per_node": volumes_per_node,
+ "volumes_size": volumes_size})
+
+ return data
+
+ def create(self, name, plugin_name, hadoop_version, flavor_id,
+ description=None, volumes_per_node=None, volumes_size=None,
+ node_processes=None, node_configs=None, floating_ip_pool=None):
+
+ data = self._assign_field(name, plugin_name, hadoop_version, flavor_id,
+ description, volumes_per_node, volumes_size,
+ node_processes, node_configs,
+ floating_ip_pool)
+
+ return self._create('/node-group-templates', data,
+ 'node_group_template')
+
+ def update(self, ng_template_id, name, plugin_name, hadoop_version,
+ flavor_id, description=None, volumes_per_node=None,
+ volumes_size=None, node_processes=None,
+ node_configs=None, floating_ip_pool=None):
+
+ data = self._assign_field(name, plugin_name, hadoop_version, flavor_id,
+ description, volumes_per_node,
+ volumes_size, node_processes,
+ node_configs, floating_ip_pool)
+
+ return self._update('/node-group-templates/%s' % ng_template_id, data,
+ 'node_group_template')
+
+ def list(self):
+ return self._list('/node-group-templates', 'node_group_templates')
+
+ def get(self, ng_template_id):
+ return self._get('/node-group-templates/%s' % ng_template_id,
+ 'node_group_template')
+
+ def delete(self, ng_template_id):
+ self._delete('/node-group-templates/%s' % ng_template_id)
diff --git a/saharaclient/api/parameters.py b/saharaclient/api/parameters.py
new file mode 100644
index 0000000..4575b56
--- /dev/null
+++ b/saharaclient/api/parameters.py
@@ -0,0 +1,26 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class Parameter(object):
+ """This bean is used for building config entries."""
+ def __init__(self, config):
+ self.name = config['name']
+ self.description = config.get('description', "No description")
+ self.required = not config['is_optional']
+ self.default_value = config.get('default_value', None)
+ self.initial_value = self.default_value
+ self.param_type = config['config_type']
+ self.priority = int(config.get('priority', 2))
diff --git a/saharaclient/api/plugins.py b/saharaclient/api/plugins.py
new file mode 100644
index 0000000..967da39
--- /dev/null
+++ b/saharaclient/api/plugins.py
@@ -0,0 +1,56 @@
+# Copyright (c) 2013 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from six.moves.urllib import parse as urlparse
+
+from saharaclient.api import base
+
+
+class Plugin(base.Resource):
+ resource_name = 'Plugin'
+
+ def __init__(self, manager, info):
+ base.Resource.__init__(self, manager, info)
+
+ # Horizon requires each object in table to have an id
+ self.id = self.name
+
+
+class PluginManager(base.ResourceManager):
+ resource_class = Plugin
+
+ def list(self):
+ return self._list('/plugins', 'plugins')
+
+ def get(self, plugin_name):
+ return self._get('/plugins/%s' % plugin_name, 'plugin')
+
+ def get_version_details(self, plugin_name, hadoop_version):
+ return self._get('/plugins/%s/%s' % (plugin_name, hadoop_version),
+ 'plugin')
+
+ def convert_to_cluster_template(self, plugin_name, hadoop_version,
+ template_name, filecontent):
+ resp = self.api.client.post('/plugins/%s/%s/convert-config/%s' %
+ (plugin_name,
+ hadoop_version,
+ urlparse.quote(template_name)),
+ filecontent)
+ if resp.status_code != 202:
+ raise RuntimeError('Failed to upload template file for plugin "%s"'
+ ' and version "%s"' %
+ (plugin_name, hadoop_version))
+ else:
+ return base.get_json(resp)['cluster_template']
diff --git a/saharaclient/api/shell.py b/saharaclient/api/shell.py
new file mode 100644
index 0000000..20f2269
--- /dev/null
+++ b/saharaclient/api/shell.py
@@ -0,0 +1,778 @@
+# Copyright 2013 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import datetime
+import inspect
+import json
+from saharaclient.nova import utils
+from saharaclient.openstack.common.apiclient import exceptions
+import sys
+
+
+def _print_list_field(field):
+ return lambda obj: ', '.join(getattr(obj, field))
+
+
+def _print_node_group_field(cluster):
+ return ', '.join(map(lambda x: ': '.join(x),
+ [(node_group['name'],
+ str(node_group['count']))
+ for node_group in cluster.node_groups]))
+
+
+def _show_node_group_template(template):
+ template._info['node_processes'] = (
+ ', '.join(template._info['node_processes'])
+ )
+ utils.print_dict(template._info)
+
+
+def _show_cluster_template(template):
+ template._info['node_groups'] = _print_node_group_field(template)
+ utils.print_dict(template._info)
+
+
+def _show_cluster(cluster):
+ # TODO(mattf): Make this pretty, e.g format node_groups and info urls
+ # Forcing wrap=47 allows for clean display on a terminal of width 80
+ utils.print_dict(cluster._info, wrap=47)
+
+
+def _show_job_binary_data(data):
+ columns = ('id', 'name')
+ utils.print_list(data, columns)
+
+
+def _show_data_source(source):
+ # TODO(mattf): why are we passing credentials around like this?
+ if 'credentials' in source._info:
+ del source._info['credentials']
+ utils.print_dict(source._info)
+
+
+def _show_job_binary(binary):
+ # TODO(mattf): why are we passing credentials around like this?
+ if 'extra' in binary._info:
+ del binary._info['extra']
+ utils.print_dict(binary._info)
+
+
+def _show_job_template(template):
+ # TODO(mattf): Make "mains" property pretty
+ # TODO(mattf): handle/remove "extra" creds
+ utils.print_dict(template._info)
+
+
+def _show_job(job):
+ # TODO(mattf): make display of info pretty, until then
+ # extract the important status information
+ job._info['status'] = job._info['info']['status']
+ del job._info['info']
+ utils.print_dict(job._info)
+
+
+def _get_by_id_or_name(manager, id=None, name=None):
+ if not (name or id):
+ raise exceptions.CommandError("either NAME or ID is required")
+ if id:
+ return manager.get(id)
+ ls = manager.find(name=name)
+ if len(ls) == 0:
+ raise exceptions.CommandError("%s '%s' not found" %
+ (manager.resource_class.resource_name,
+ name))
+ elif len(ls) > 1:
+ raise exceptions.CommandError("%s '%s' not unique, try by ID" %
+ (manager.resource_class.resource_name,
+ name))
+ return ls[0]
+
+
+#
+# Plugins
+# ~~~~~~~
+# plugin-list
+#
+# plugin-show --name <plugin> [--version <version>]
+#
+
+def do_plugin_list(cs, args):
+ """Print a list of available plugins."""
+ plugins = cs.plugins.list()
+ columns = ('name', 'versions', 'title')
+ utils.print_list(plugins, columns,
+ {'versions': _print_list_field('versions')})
+
+
+@utils.arg('--name',
+ metavar='<plugin>',
+ required=True,
+ help='Name of the plugin.')
+# TODO(mattf) - saharaclient does not support query w/ version
+#@utils.arg('--version',
+# metavar='<version>',
+# help='Optional version')
+def do_plugin_show(cs, args):
+ """Show details of a plugin."""
+ plugin = cs.plugins.get(args.name)
+ plugin._info['versions'] = ', '.join(plugin._info['versions'])
+ utils.print_dict(plugin._info)
+
+
+#
+# Image Registry
+# ~~~~~~~~~~~~~~
+# image-list [--tag <tag>]*
+#
+# image-show --name <image>|--id <image_id>
+#
+# image-register --name <image>|--id <image_id>
+# [--username <name>] [--description <desc>]
+#
+# image-unregister --name <image>|--id <image_id>
+#
+# image-add-tag --name <image>|--id <image_id> --tag <tag>+
+#
+# image-remove-tag --name <image>|--id <image_id> --tag <tag>+
+#
+
+# TODO(mattf): [--tag <tag>]*
+def do_image_list(cs, args):
+ """Print a list of available images."""
+ images = cs.images.list()
+ columns = ('name', 'id', 'username', 'tags', 'description')
+ utils.print_list(images, columns, {'tags': _print_list_field('tags')})
+
+
+@utils.arg('--name',
+ help='Name of the image.')
+@utils.arg('--id',
+ metavar='<image_id>',
+ help='ID of the image.')
+def do_image_show(cs, args):
+ """Show details of an image."""
+ image = _get_by_id_or_name(cs.images, args.id, args.name)
+ del image._info['metadata']
+ image._info['tags'] = ', '.join(image._info['tags'])
+ utils.print_dict(image._info)
+
+
+# TODO(mattf): Add --name, must lookup in glance index
+@utils.arg('--id',
+ metavar='<image_id>',
+ required=True,
+ help='ID of image, run "glance image-list" to see all IDs.')
+@utils.arg('--username',
+ default='root',
+ metavar='<name>',
+ help='Username of privileged user in the image.')
+@utils.arg('--description',
+ default='',
+ metavar='<desc>',
+ help='Description of the image.')
+def do_image_register(cs, args):
+ """Register an image from the Image index."""
+ # TODO(mattf): image register should not be through update
+ cs.images.update_image(args.id, args.username, args.description)
+ # TODO(mattf): No indication of result, expect image details
+
+
+@utils.arg('--name',
+ help='Name of the image.')
+@utils.arg('--id',
+ metavar='<image_id>',
+ help='ID of image to unregister.')
+def do_image_unregister(cs, args):
+ """Unregister an image."""
+ cs.images.unregister_image(
+ args.id or _get_by_id_or_name(cs.images, name=args.name).id
+ )
+ # TODO(mattf): No indication of result, expect result to display
+
+
+@utils.arg('--name',
+ help='Name of the image.')
+@utils.arg('--id',
+ metavar='<image_id>',
+ help='ID of image to tag.')
+# TODO(mattf): Change --tag to --tag+
+@utils.arg('--tag',
+ metavar='<tag>',
+ required=True,
+ help='Tag to add.')
+def do_image_add_tag(cs, args):
+ """Add a tag to an image."""
+ # TODO(mattf): Need proper add_tag API call
+ id = args.id or _get_by_id_or_name(cs.images, name=args.name).id
+ cs.images.update_tags(id, cs.images.get(id).tags + [args.tag, ])
+ # TODO(mattf): No indication of result, expect image details
+
+
+@utils.arg('--name',
+ help='Name of the image.')
+@utils.arg('--id',
+ metavar='<image_id>',
+ help='Image to tag.')
+# TODO(mattf): Change --tag to --tag+
+@utils.arg('--tag',
+ metavar='<tag>',
+ required=True,
+ help='Tag to remove.')
+def do_image_remove_tag(cs, args):
+ """Remove a tag from an image."""
+ # TODO(mattf): Need proper remove_tag API call
+ id = args.id or _get_by_id_or_name(cs.images, name=args.name).id
+ cs.images.update_tags(id,
+ filter(lambda x: x != args.tag,
+ cs.images.get(id).tags))
+ # TODO(mattf): No indication of result, expect image details
+
+
+#
+# Clusters
+# ~~~~~~~~
+# cluster-list
+#
+# cluster-show --name <cluster>|--id <cluster_id> [--json]
+#
+# cluster-create [--json <file>]
+#
+# TODO(mattf): cluster-scale
+#
+# cluster-delete --name <cluster>|--id <cluster_id>
+#
+
+def do_cluster_list(cs, args):
+ """Print a list of available clusters."""
+ clusters = cs.clusters.list()
+ for cluster in clusters:
+ cluster.node_count = sum(map(lambda g: g['count'],
+ cluster.node_groups))
+ columns = ('name', 'id', 'status', 'node_count')
+ utils.print_list(clusters, columns)
+
+
+@utils.arg('--name',
+ help='Name of the cluster.')
+@utils.arg('--id',
+ metavar='<cluster_id>',
+ help='ID of the cluster to show.')
+@utils.arg('--json',
+ action='store_true',
+ default=False,
+ help='Print JSON representation of the cluster.')
+def do_cluster_show(cs, args):
+ """Show details of a cluster."""
+ cluster = _get_by_id_or_name(cs.clusters, args.id, args.name)
+ if args.json:
+ print(json.dumps(cluster._info))
+ else:
+ _show_cluster(cluster)
+
+
+@utils.arg('--json',
+ default=sys.stdin,
+ type=argparse.FileType('r'),
+ help='JSON representation of cluster.')
+def do_cluster_create(cs, args):
+ """Create a cluster."""
+ # TODO(mattf): improve template validation, e.g. template w/o name key
+ template = json.loads(args.json.read())
+ # The neutron_management_network parameter to clusters.create is
+ # called net_id. Therefore, we must translate before invoking
+ # create w/ **template. It may be desirable to simple change
+ # clusters.create in the future.
+ template['net_id'] = template.get('neutron_management_network', None)
+ valid_args = inspect.getargspec(cs.clusters.create).args
+ for name in template.keys():
+ if name not in valid_args:
+ # TODO(mattf): make this verbose - bug/1271147
+ del template[name]
+ _show_cluster(cs.clusters.create(**template))
+
+
+@utils.arg('--name',
+ help='Name of the cluster.')
+@utils.arg('--id',
+ metavar='<cluster_id>',
+ help='ID of the cluster to delete.')
+def do_cluster_delete(cs, args):
+ """Delete a cluster."""
+ cs.clusters.delete(
+ args.id or _get_by_id_or_name(cs.clusters, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Node Group Templates
+# ~~~~~~~~~~~~~~~~~~~~
+# node-group-template-list
+#
+# node-group-template-show --name <template>|--id <template_id> [--json]
+#
+# node-group-template-create [--json <file>]
+#
+# node-group-template-delete --name <template>|--id <template_id>
+#
+
+def do_node_group_template_list(cs, args):
+ """Print a list of available node group templates."""
+ templates = cs.node_group_templates.list()
+ columns = ('name', 'id', 'plugin_name', 'node_processes', 'description')
+ utils.print_list(templates, columns,
+ {'node_processes': _print_list_field('node_processes')})
+
+
+@utils.arg('--name',
+ help='Name of the node group template.')
+@utils.arg('--id',
+ metavar='<template_id>',
+ help='ID of the node group template to show.')
+@utils.arg('--json',
+ action='store_true',
+ default=False,
+ help='Print JSON representation of node group template.')
+def do_node_group_template_show(cs, args):
+ """Show details of a node group template."""
+ template = _get_by_id_or_name(cs.node_group_templates, args.id, args.name)
+ if args.json:
+ print(json.dumps(template._info))
+ else:
+ _show_node_group_template(template)
+
+
+@utils.arg('--json',
+ default=sys.stdin,
+ type=argparse.FileType('r'),
+ help='JSON representation of node group template.')
+def do_node_group_template_create(cs, args):
+ """Create a node group template."""
+ # TODO(mattf): improve template validation, e.g. template w/o name key
+ template = json.loads(args.json.read())
+ valid_args = inspect.getargspec(cs.node_group_templates.create).args
+ for name in template.keys():
+ if name not in valid_args:
+ # TODO(mattf): make this verbose - bug/1271147
+ del template[name]
+ _show_node_group_template(cs.node_group_templates.create(**template))
+
+
+@utils.arg('--name',
+ help='Name of the node group template.')
+@utils.arg('--id',
+ metavar='<template_id>',
+ help='ID of the node group template to delete.')
+def do_node_group_template_delete(cs, args):
+ """Delete a node group template."""
+ cs.node_group_templates.delete(
+ args.id or
+ _get_by_id_or_name(cs.node_group_templates, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Cluster Templates
+# ~~~~~~~~~~~~~~~~~
+# cluster-template-list
+#
+# cluster-template-show --name <template>|--id <template_id> [--json]
+#
+# cluster-template-create [--json <file>]
+#
+# cluster-template-delete --name <template>|--id <template_id>
+#
+
+def do_cluster_template_list(cs, args):
+ """Print a list of available cluster templates."""
+ templates = cs.cluster_templates.list()
+ columns = ('name', 'id', 'plugin_name', 'node_groups', 'description')
+ utils.print_list(templates, columns,
+ {'node_groups': _print_node_group_field})
+
+
+@utils.arg('--name',
+ help='Name of the cluster template.')
+@utils.arg('--id',
+ metavar='<template_id>',
+ help='ID of the cluster template to show.')
+@utils.arg('--json',
+ action='store_true',
+ default=False,
+ help='Print JSON representation of cluster template.')
+def do_cluster_template_show(cs, args):
+ """Show details of a cluster template."""
+ template = _get_by_id_or_name(cs.cluster_templates, args.id, args.name)
+ if args.json:
+ print(json.dumps(template._info))
+ else:
+ _show_cluster_template(template)
+
+
+@utils.arg('--json',
+ default=sys.stdin,
+ type=argparse.FileType('r'),
+ help='JSON representation of cluster template.')
+def do_cluster_template_create(cs, args):
+ """Create a cluster template."""
+ # TODO(mattf): improve template validation, e.g. template w/o name key
+ template = json.loads(args.json.read())
+ valid_args = inspect.getargspec(cs.cluster_templates.create).args
+ for name in template.keys():
+ if name not in valid_args:
+ # TODO(mattf): make this verbose - bug/1271147
+ del template[name]
+ _show_cluster_template(cs.cluster_templates.create(**template))
+
+
+@utils.arg('--name',
+ help='Name of the cluster template.')
+@utils.arg('--id',
+ metavar='<template_id>',
+ help='ID of the cluster template to delete.')
+def do_cluster_template_delete(cs, args):
+ """Delete a cluster template."""
+ cs.cluster_templates.delete(
+ args.id or _get_by_id_or_name(cs.cluster_templates, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Data Sources
+# ~~~~~~~~~~~~
+# data-source-list
+#
+# data-source-show --name <name>|--id <id>
+#
+# data-source-create --name <name> --type <type>
+# --url <url>
+# [--user <user> --password <password>]
+# [--description <desc>]
+# NB: user & password if type is swift
+#
+# data-source-delete --name <name>|--id <id>
+#
+
+def do_data_source_list(cs, args):
+ """Print a list of available data sources."""
+ sources = cs.data_sources.list()
+ columns = ('name', 'id', 'type', 'description')
+ utils.print_list(sources, columns)
+
+
+@utils.arg('--name',
+ help='Name of the data source.')
+@utils.arg('--id',
+ help='ID of the data source.')
+def do_data_source_show(cs, args):
+ """Show details of a data source."""
+ _show_data_source(_get_by_id_or_name(cs.data_sources, args.id, args.name))
+
+
+@utils.arg('--name',
+ required=True,
+ help='Name of the data source.')
+@utils.arg('--type',
+ required=True,
+ help='Type of the data source.')
+@utils.arg('--url',
+ required=True,
+ help='URL for the data source.')
+@utils.arg('--description',
+ default='',
+ help='Description of the data source.')
+@utils.arg('--user',
+ default=None,
+ help='Username for accessing the data source URL.')
+@utils.arg('--password',
+ default=None,
+ help='Password for accessing the data source URL.')
+def do_data_source_create(cs, args):
+ """Create a data source that provides job input or receives job output."""
+ _show_data_source(cs.data_sources.create(args.name, args.description,
+ args.type, args.url,
+ args.user, args.password))
+
+
+@utils.arg('--name',
+ help='Name of the data source.')
+@utils.arg('--id',
+ help='ID of data source to delete.')
+def do_data_source_delete(cs, args):
+ """Delete a data source."""
+ cs.data_sources.delete(
+ args.id or _get_by_id_or_name(cs.data_sources, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Job Binary Internals
+# ~~~~~~~~~~~~~~~~~~~~
+# job-binary-data-list
+#
+# job-binary-data-create [--file <file>]
+#
+# job-binary-data-delete --id <id>
+#
+
+def do_job_binary_data_list(cs, args):
+ """Print a list of internally stored job binary data."""
+ _show_job_binary_data(cs.job_binary_internals.list())
+
+
+@utils.arg('--file',
+ default=sys.stdin,
+ type=argparse.FileType('r'),
+ help='Data to store.')
+def do_job_binary_data_create(cs, args):
+ """Store data in the internal DB.
+ Use 'swift upload' instead of this command.
+ Use this command only if Swift is not available.
+ """
+ # Should be %F-%T except for type validation errors
+ _show_job_binary_data((cs.job_binary_internals.create(
+ datetime.datetime.now().strftime('d%Y%m%d%H%M%S'),
+ args.file.read()),)
+ )
+
+
+@utils.arg('--id',
+ required=True,
+ help='ID of internally stored job binary data.')
+def do_job_binary_data_delete(cs, args):
+ """Delete an internally stored job binary data."""
+ cs.job_binary_internals.delete(args.id)
+ # TODO(mattf): No indication of result
+ # TODO(mattf): Appears no DB constraints for removing data used by job
+
+
+#
+# Job Binaries
+# ~~~~~~~~~~~~
+# job-binary-list
+#
+# job-binary-show --name <name>|--id <id>
+#
+# job-binary-create --name <name> --url <url>
+# [--user <user> --password <password>]
+# [--description <desc>]
+#
+# job-binary-delete --name <name>|--id <id>
+#
+
+def do_job_binary_list(cs, args):
+ """Print a list of job binaries."""
+ binaries = cs.job_binaries.list()
+ columns = ('id', 'name', 'description')
+ utils.print_list(binaries, columns)
+
+
+@utils.arg('--name',
+ help='Name of the job binary.')
+@utils.arg('--id',
+ help='ID of the job binary.')
+def do_job_binary_show(cs, args):
+ """Show details of a job binary."""
+ _show_job_binary(_get_by_id_or_name(cs.job_binaries, args.id, args.name))
+
+
+@utils.arg('--name',
+ required=True,
+ help='Name of the job binary.')
+@utils.arg('--url',
+ required=True,
+ help='URL for the job binary.')
+@utils.arg('--description',
+ default='',
+ help='Description of the job binary.')
+@utils.arg('--user',
+ default=None,
+ help='Username for accessing the job binary URL.')
+@utils.arg('--password',
+ default=None,
+ help='Password for accessing the job binary URL.')
+def do_job_binary_create(cs, args):
+ """Record a job binary."""
+ # TODO(mattf): make credentials consistent w/ data source
+ extra = {}
+ if args.user:
+ extra['user'] = args.user
+ if args.password:
+ extra['password'] = args.password
+ _show_job_binary(cs.job_binaries.create(args.name, args.url,
+ args.description, extra))
+
+
+@utils.arg('--name',
+ help='Name of the job binary.')
+@utils.arg('--id',
+ help='ID of the job binary to delete.')
+def do_job_binary_delete(cs, args):
+ """Delete a job binary."""
+ cs.job_binaries.delete(
+ args.id or _get_by_id_or_name(cs.job_binaries, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Jobs
+# ~~~~
+# job-template-list
+#
+# job-template-show --name <name>|--id <id>
+#
+# job-template-create --name <name>
+# --type <Pig|Hive|MapReduce|Java|...>
+# [--mains <array of string>]
+# [--libs <array of string>]
+# [--description <desc>]
+#
+# job-template-delete --name <name>|--id <id>
+#
+
+def do_job_template_list(cs, args):
+ """Print a list of job templates."""
+ templates = cs.jobs.list()
+ columns = ('id', 'name', 'description')
+ utils.print_list(templates, columns)
+
+
+@utils.arg('--name',
+ help='Name of the job template.')
+@utils.arg('--id',
+ help='ID of the job template.')
+def do_job_template_show(cs, args):
+ """Show details of a job template."""
+ _show_job_template(_get_by_id_or_name(cs.jobs, args.id, args.name))
+
+
+@utils.arg('--name',
+ required=True,
+ help='Name of the job template.')
+@utils.arg('--type',
+ required=True,
+ help='Type of the job template.')
+@utils.arg('--main',
+ action='append',
+ default=[],
+ help='ID for job\'s main job-binary.')
+@utils.arg('--lib',
+ action='append',
+ default=[],
+ help='ID of job\'s lib job-binary, repeatable.')
+@utils.arg('--description',
+ default='',
+ help='Description of the job template.')
+def do_job_template_create(cs, args):
+ """Create a job template."""
+ _show_job_template(cs.jobs.create(args.name, args.type,
+ args.main, args.lib,
+ args.description))
+
+
+@utils.arg('--name',
+ help='Name of the job template.')
+@utils.arg('--id',
+ help='ID of the job template.')
+def do_job_template_delete(cs, args):
+ """Delete a job template."""
+ cs.jobs.delete(
+ args.id or _get_by_id_or_name(cs.jobs, name=args.name).id
+ )
+ # TODO(mattf): No indication of result
+
+
+#
+# Job Executions
+# ~~~~~~~~~~~~~~
+# job-list
+#
+# job-show --id <id>
+#
+# job-create --job-template <id> --cluster <id>
+# [--input-data <id>] [--output-data <id>]
+# [--param <name=value>]
+# [--arg <arg>]
+# [--config <name=value>]
+#
+# job-delete --id <id>
+#
+
+def do_job_list(cs, args):
+ """Print a list of jobs."""
+ jobs = cs.job_executions.list()
+ for job in jobs:
+ # why is status in info.status?
+ job.status = job.info['status']
+ # TODO(mattf): why can cluster_id be None?
+ columns = ('id', 'cluster_id', 'status')
+ utils.print_list(jobs, columns)
+
+
+@utils.arg('--id',
+ required=True,
+ help='ID of the job.')
+def do_job_show(cs, args):
+ """Show details of a job."""
+ _show_job(cs.job_executions.get(args.id))
+
+
+@utils.arg('--job-template',
+ required=True,
+ help='ID of the job template to run.')
+@utils.arg('--cluster',
+ required=True,
+ help='ID of the cluster to run the job in.')
+@utils.arg('--input-data',
+ default=None,
+ help='ID of the input data source.')
+@utils.arg('--output-data',
+ default=None,
+ help='ID of the output data source.')
+@utils.arg('--param',
+ metavar='name=value',
+ action='append',
+ default=[],
+ help='Parameters to add to the job, repeatable.')
+@utils.arg('--arg',
+ action='append',
+ default=[],
+ help='Arguments to add to the job, repeatable.')
+@utils.arg('--config',
+ metavar='name=value',
+ action='append',
+ default=[],
+ help='Config parameters to add to the job, repeatable.')
+def do_job_create(cs, args):
+ _convert = lambda ls: dict(map(lambda i: i.split('=', 1), ls))
+ _show_job(cs.job_executions.create(args.job_template, args.cluster,
+ args.input_data, args.output_data,
+ {'params': _convert(args.param),
+ 'args': args.arg,
+ 'configs': _convert(args.config)}))
+
+
+@utils.arg('--id',
+ required=True,
+ help='ID of a job.')
+def do_job_delete(cs, args):
+ """Delete a job."""
+ cs.job_executions.delete(args.id)
+ # TODO(mattf): No indication of result