diff options
Diffstat (limited to 'saharaclient/api')
-rw-r--r-- | saharaclient/api/__init__.py | 0 | ||||
-rw-r--r-- | saharaclient/api/base.py | 155 | ||||
-rw-r--r-- | saharaclient/api/client.py | 107 | ||||
-rw-r--r-- | saharaclient/api/cluster_templates.py | 70 | ||||
-rw-r--r-- | saharaclient/api/clusters.py | 74 | ||||
-rw-r--r-- | saharaclient/api/data_sources.py | 47 | ||||
-rw-r--r-- | saharaclient/api/helpers.py | 76 | ||||
-rw-r--r-- | saharaclient/api/httpclient.py | 40 | ||||
-rw-r--r-- | saharaclient/api/images.py | 68 | ||||
-rw-r--r-- | saharaclient/api/job_binaries.py | 50 | ||||
-rw-r--r-- | saharaclient/api/job_binary_internals.py | 38 | ||||
-rw-r--r-- | saharaclient/api/job_executions.py | 50 | ||||
-rw-r--r-- | saharaclient/api/jobs.py | 47 | ||||
-rw-r--r-- | saharaclient/api/node_group_templates.py | 84 | ||||
-rw-r--r-- | saharaclient/api/parameters.py | 26 | ||||
-rw-r--r-- | saharaclient/api/plugins.py | 56 | ||||
-rw-r--r-- | saharaclient/api/shell.py | 778 |
17 files changed, 1766 insertions, 0 deletions
diff --git a/saharaclient/api/__init__.py b/saharaclient/api/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/saharaclient/api/__init__.py diff --git a/saharaclient/api/base.py b/saharaclient/api/base.py new file mode 100644 index 0000000..cf4c493 --- /dev/null +++ b/saharaclient/api/base.py @@ -0,0 +1,155 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import six + +LOG = logging.getLogger(__name__) + + +class Resource(object): + resource_name = 'Something' + defaults = {} + + def __init__(self, manager, info): + self.manager = manager + info = info.copy() + self._info = info + self._set_defaults(info) + self._add_details(info) + + def _set_defaults(self, info): + for name, value in six.iteritems(self.defaults): + if name not in info: + info[name] = value + + def _add_details(self, info): + for (k, v) in six.iteritems(info): + try: + setattr(self, k, v) + self._info[k] = v + except AttributeError: + # In this case we already defined the attribute on the class + pass + + def __str__(self): + return '%s %s' % (self.resource_name, str(self._info)) + + +def _check_items(obj, searches): + try: + return all(getattr(obj, attr) == value for (attr, value) in searches) + except AttributeError: + return False + + +class ResourceManager(object): + resource_class = None + + def __init__(self, api): + self.api = api + + def find(self, **kwargs): + return [i for i in self.list() if _check_items(i, kwargs.items())] + + def _copy_if_defined(self, data, **kwargs): + for var_name, var_value in six.iteritems(kwargs): + if var_value is not None: + data[var_name] = var_value + + def _create(self, url, data, response_key=None, dump_json=True): + if dump_json: + data = json.dumps(data) + resp = self.api.client.post(url, data) + + if resp.status_code != 202: + self._raise_api_exception(resp) + + if response_key is not None: + data = get_json(resp)[response_key] + else: + data = get_json(resp) + return self.resource_class(self, data) + + def _update(self, url, data, response_key=None, dump_json=True): + if dump_json: + data = json.dumps(data) + resp = self.api.client.put(url, data) + + if resp.status_code != 202: + self._raise_api_exception(resp) + if response_key is not None: + data = get_json(resp)[response_key] + else: + data = get_json(resp) + return self.resource_class(self, data) + + def _list(self, url, response_key): + resp = self.api.client.get(url) + if resp.status_code == 200: + data = get_json(resp)[response_key] + + return [self.resource_class(self, res) + for res in data] + else: + self._raise_api_exception(resp) + + def _get(self, url, response_key=None): + resp = self.api.client.get(url) + + if resp.status_code == 200: + if response_key is not None: + data = get_json(resp)[response_key] + else: + data = get_json(resp) + return self.resource_class(self, data) + else: + self._raise_api_exception(resp) + + def _delete(self, url): + resp = self.api.client.delete(url) + + if resp.status_code != 204: + self._raise_api_exception(resp) + + def _plurify_resource_name(self): + return self.resource_class.resource_name + 's' + + def _raise_api_exception(self, resp): + error_data = get_json(resp) + raise APIException(error_code=error_data.get("error_code"), + error_name=error_data.get("error_name"), + error_message=error_data.get("error_message")) + + +def get_json(response): + """This method provided backward compatibility with old versions + of requests library + + """ + json_field_or_function = getattr(response, 'json', None) + if callable(json_field_or_function): + return response.json() + else: + return json.loads(response.content) + + +class APIException(Exception): + def __init__(self, error_code=None, error_name=None, error_message=None): + super(APIException, self).__init__(error_message) + self.error_code = error_code + self.error_name = error_name + self.error_message = error_message diff --git a/saharaclient/api/client.py b/saharaclient/api/client.py new file mode 100644 index 0000000..bbb701b --- /dev/null +++ b/saharaclient/api/client.py @@ -0,0 +1,107 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from keystoneclient.v2_0 import client as keystone_client_v2 +from keystoneclient.v3 import client as keystone_client_v3 + +from saharaclient.api import cluster_templates +from saharaclient.api import clusters +from saharaclient.api import data_sources +from saharaclient.api import httpclient +from saharaclient.api import images +from saharaclient.api import job_binaries +from saharaclient.api import job_binary_internals +from saharaclient.api import job_executions +from saharaclient.api import jobs +from saharaclient.api import node_group_templates +from saharaclient.api import plugins + + +class Client(object): + def __init__(self, username=None, api_key=None, project_id=None, + project_name=None, auth_url=None, savanna_url=None, + endpoint_type='publicURL', service_type='data_processing', + input_auth_token=None): + + if not input_auth_token: + keystone = self.get_keystone_client(username=username, + api_key=api_key, + auth_url=auth_url, + project_id=project_id, + project_name=project_name) + input_auth_token = keystone.auth_token + if not input_auth_token: + raise RuntimeError("Not Authorized") + + savanna_catalog_url = savanna_url + if not savanna_url: + keystone = self.get_keystone_client(username=username, + api_key=api_key, + auth_url=auth_url, + token=input_auth_token, + project_id=project_id, + project_name=project_name) + catalog = keystone.service_catalog.get_endpoints(service_type) + if service_type in catalog: + for e_type, endpoint in catalog.get(service_type)[0].items(): + if str(e_type).lower() == str(endpoint_type).lower(): + savanna_catalog_url = endpoint + break + if not savanna_catalog_url: + raise RuntimeError("Could not find Sahara endpoint in catalog") + + self.client = httpclient.HTTPClient(savanna_catalog_url, + input_auth_token) + + self.clusters = clusters.ClusterManager(self) + self.cluster_templates = cluster_templates.ClusterTemplateManager(self) + self.node_group_templates = (node_group_templates. + NodeGroupTemplateManager(self)) + self.plugins = plugins.PluginManager(self) + self.images = images.ImageManager(self) + + self.data_sources = data_sources.DataSourceManager(self) + self.jobs = jobs.JobsManager(self) + self.job_executions = job_executions.JobExecutionsManager(self) + self.job_binaries = job_binaries.JobBinariesManager(self) + self.job_binary_internals =\ + job_binary_internals.JobBinaryInternalsManager(self) + + def get_keystone_client(self, username=None, api_key=None, auth_url=None, + token=None, project_id=None, project_name=None): + if not auth_url: + raise RuntimeError("No auth url specified") + imported_client = keystone_client_v2 if "v2.0" in auth_url\ + else keystone_client_v3 + if not getattr(self, "keystone_client", None): + self.keystone_client = imported_client.Client( + username=username, + password=api_key, + token=token, + tenant_id=project_id, + tenant_name=project_name, + auth_url=auth_url, + endpoint=auth_url) + + self.keystone_client.authenticate() + + return self.keystone_client + + @staticmethod + def get_projects_list(keystone_client): + if isinstance(keystone_client, keystone_client_v2.Client): + return keystone_client.tenants + + return keystone_client.projects diff --git a/saharaclient/api/cluster_templates.py b/saharaclient/api/cluster_templates.py new file mode 100644 index 0000000..f9ca583 --- /dev/null +++ b/saharaclient/api/cluster_templates.py @@ -0,0 +1,70 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class ClusterTemplate(base.Resource): + resource_name = 'Cluster Template' + + +class ClusterTemplateManager(base.ResourceManager): + resource_class = ClusterTemplate + + def _assign_field(self, name, plugin_name, hadoop_version, + description=None, cluster_configs=None, node_groups=None, + anti_affinity=None, net_id=None): + data = { + 'name': name, + 'plugin_name': plugin_name, + 'hadoop_version': hadoop_version, + } + + self._copy_if_defined(data, + description=description, + cluster_configs=cluster_configs, + node_groups=node_groups, + anti_affinity=anti_affinity, + neutron_management_network=net_id) + return data + + def create(self, name, plugin_name, hadoop_version, description=None, + cluster_configs=None, node_groups=None, anti_affinity=None, + net_id=None): + data = self._assign_field(name, plugin_name, hadoop_version, + description, cluster_configs, node_groups, + anti_affinity, net_id) + + return self._create('/cluster-templates', data, 'cluster_template') + + def update(self, cluster_template_id, name, plugin_name, hadoop_version, + description=None, cluster_configs=None, node_groups=None, + anti_affinity=None, net_id=None): + data = self._assign_field(name, plugin_name, hadoop_version, + description, cluster_configs, node_groups, + anti_affinity, net_id) + + return self._update('/cluster-templates/%s' % cluster_template_id, + data, 'cluster_template') + + def list(self): + return self._list('/cluster-templates', 'cluster_templates') + + def get(self, cluster_template_id): + return self._get('/cluster-templates/%s' % cluster_template_id, + 'cluster_template') + + def delete(self, cluster_template_id): + self._delete('/cluster-templates/%s' % cluster_template_id) diff --git a/saharaclient/api/clusters.py b/saharaclient/api/clusters.py new file mode 100644 index 0000000..ec9a822 --- /dev/null +++ b/saharaclient/api/clusters.py @@ -0,0 +1,74 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from saharaclient.api import base + + +class Cluster(base.Resource): + resource_name = 'Cluster' + + +class ClusterManager(base.ResourceManager): + resource_class = Cluster + + def _assert_variables(self, **kwargs): + for var_name, var_value in six.iteritems(kwargs): + if var_value is None: + raise base.APIException('Cluster is missing field "%s"' % + var_name) + + def create(self, name, plugin_name, hadoop_version, + cluster_template_id=None, default_image_id=None, + is_transient=None, description=None, cluster_configs=None, + node_groups=None, user_keypair_id=None, + anti_affinity=None, net_id=None): + + data = { + 'name': name, + 'plugin_name': plugin_name, + 'hadoop_version': hadoop_version, + } + + if cluster_template_id is None: + self._assert_variables(default_image_id=default_image_id, + cluster_configs=cluster_configs, + node_groups=node_groups) + + self._copy_if_defined(data, + cluster_template_id=cluster_template_id, + is_transient=is_transient, + default_image_id=default_image_id, + description=description, + cluster_configs=cluster_configs, + node_groups=node_groups, + user_keypair_id=user_keypair_id, + anti_affinity=anti_affinity, + neutron_management_network=net_id) + + return self._create('/clusters', data, 'cluster') + + def scale(self, cluster_id, scale_object): + return self._update('/clusters/%s' % cluster_id, scale_object) + + def list(self): + return self._list('/clusters', 'clusters') + + def get(self, cluster_id): + return self._get('/clusters/%s' % cluster_id, 'cluster') + + def delete(self, cluster_id): + self._delete('/clusters/%s' % cluster_id) diff --git a/saharaclient/api/data_sources.py b/saharaclient/api/data_sources.py new file mode 100644 index 0000000..7ba0059 --- /dev/null +++ b/saharaclient/api/data_sources.py @@ -0,0 +1,47 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class DataSources(base.Resource): + resource_name = 'Data Source' + + +class DataSourceManager(base.ResourceManager): + resource_class = DataSources + + def create(self, name, description, data_source_type, + url, credential_user=None, credential_pass=None): + data = { + 'name': name, + 'description': description, + 'type': data_source_type, + 'url': url, + 'credentials': {} + } + self._copy_if_defined(data['credentials'], + user=credential_user, + password=credential_pass) + return self._create('/data-sources', data, 'data_source') + + def list(self): + return self._list('/data-sources', 'data_sources') + + def get(self, data_source_id): + return self._get('/data-sources/%s' % data_source_id, 'data_source') + + def delete(self, data_source_id): + self._delete('/data-sources/%s' % data_source_id) diff --git a/saharaclient/api/helpers.py b/saharaclient/api/helpers.py new file mode 100644 index 0000000..79495c9 --- /dev/null +++ b/saharaclient/api/helpers.py @@ -0,0 +1,76 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import parameters as params + + +class Helpers(object): + def __init__(self, savanna_client): + self.savanna = savanna_client + self.plugins = self.savanna.plugins + + def _get_node_processes(self, plugin): + processes = [] + for proc_lst in plugin.node_processes.values(): + processes += proc_lst + + return [(proc_name, proc_name) for proc_name in processes] + + def get_node_processes(self, plugin_name, hadoop_version): + plugin = self.plugins.get_version_details(plugin_name, hadoop_version) + + return self._get_node_processes(plugin) + + def _extract_parameters(self, configs, scope, applicable_target): + parameters = [] + for config in configs: + if (config['scope'] == scope and + config['applicable_target'] == applicable_target): + + parameters.append(params.Parameter(config)) + + return parameters + + def get_cluster_general_configs(self, plugin_name, hadoop_version): + plugin = self.plugins.get_version_details(plugin_name, hadoop_version) + + return self._extract_parameters(plugin.configs, 'cluster', "general") + + def get_general_node_group_configs(self, plugin_name, hadoop_version): + plugin = self.plugins.get_version_details(plugin_name, hadoop_version) + + return self._extract_parameters(plugin.configs, 'node', 'general') + + def get_targeted_node_group_configs(self, plugin_name, hadoop_version): + plugin = self.plugins.get_version_details(plugin_name, hadoop_version) + + parameters = dict() + + for service in plugin.node_processes.keys(): + parameters[service] = self._extract_parameters(plugin.configs, + 'node', service) + + return parameters + + def get_targeted_cluster_configs(self, plugin_name, hadoop_version): + plugin = self.plugins.get_version_details(plugin_name, hadoop_version) + + parameters = dict() + + for service in plugin.node_processes.keys(): + parameters[service] = self._extract_parameters(plugin.configs, + 'cluster', service) + + return parameters diff --git a/saharaclient/api/httpclient.py b/saharaclient/api/httpclient.py new file mode 100644 index 0000000..946e9c3 --- /dev/null +++ b/saharaclient/api/httpclient.py @@ -0,0 +1,40 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import requests + + +class HTTPClient(object): + def __init__(self, base_url, token): + self.base_url = base_url + self.token = token + + def get(self, url): + return requests.get(self.base_url + url, + headers={'x-auth-token': self.token}) + + def post(self, url, body): + return requests.post(self.base_url + url, body, + headers={'x-auth-token': self.token, + 'content-type': 'application/json'}) + + def put(self, url, body): + return requests.put(self.base_url + url, body, + headers={'x-auth-token': self.token, + 'content-type': 'application/json'}) + + def delete(self, url): + return requests.delete(self.base_url + url, + headers={'x-auth-token': self.token}) diff --git a/saharaclient/api/images.py b/saharaclient/api/images.py new file mode 100644 index 0000000..a705e15 --- /dev/null +++ b/saharaclient/api/images.py @@ -0,0 +1,68 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from saharaclient.api import base + + +class Image(base.Resource): + resource_name = 'Image' + defaults = {'description': ''} + + +class ImageManager(base.ResourceManager): + resource_class = Image + + def list(self): + return self._list('/images', 'images') + + def get(self, id): + return self._get('/images/%s' % id, 'image') + + def unregister_image(self, image_id): + self._delete('/images/%s' % image_id) + + def update_image(self, image_id, user_name, desc): + body = {"username": user_name, + "description": desc} + + resp = self.api.client.post('/images/%s' % image_id, json.dumps(body)) + if resp.status_code != 202: + raise RuntimeError('Failed to register image %s' % image_id) + + def update_tags(self, image_id, new_tags): + old_image = self.get(image_id) + + old_tags = frozenset(old_image.tags) + new_tags = frozenset(new_tags) + + to_add = list(new_tags - old_tags) + to_remove = list(old_tags - new_tags) + + if len(to_add) != 0: + resp = self.api.client.post('/images/%s/tag' % image_id, + json.dumps({'tags': to_add})) + + if resp.status_code != 202: + raise RuntimeError('Failed to add tags to image %s' % image_id) + + if len(to_remove) != 0: + resp = self.api.client.post('/images/%s/untag' % image_id, + json.dumps({'tags': to_remove})) + + if resp.status_code != 202: + raise RuntimeError('Failed to remove tags from image %s' % + image_id) diff --git a/saharaclient/api/job_binaries.py b/saharaclient/api/job_binaries.py new file mode 100644 index 0000000..a1e4b9e --- /dev/null +++ b/saharaclient/api/job_binaries.py @@ -0,0 +1,50 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class JobBinaries(base.Resource): + resource_name = 'Job Binary' + + +class JobBinariesManager(base.ResourceManager): + resource_class = JobBinaries + + def create(self, name, url, description, extra): + data = { + "name": name, + "url": url, + "description": description, + "extra": extra + } + + return self._create('/job-binaries', data, 'job_binary') + + def list(self): + return self._list('/job-binaries', 'binaries') + + def get(self, job_binary_id): + return self._get('/job-binaries/%s' % job_binary_id, 'job_binary') + + def delete(self, job_binary_id): + self._delete('/job-binaries/%s' % job_binary_id) + + def get_file(self, job_binary_id): + resp = self.api.client.get('/job-binaries/%s/data' % job_binary_id) + + if resp.status_code != 200: + self._raise_api_exception(resp) + return resp.content diff --git a/saharaclient/api/job_binary_internals.py b/saharaclient/api/job_binary_internals.py new file mode 100644 index 0000000..b90d478 --- /dev/null +++ b/saharaclient/api/job_binary_internals.py @@ -0,0 +1,38 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class JobBinaryInternal(base.Resource): + resource_name = 'JobBinaryInternal' + + +class JobBinaryInternalsManager(base.ResourceManager): + resource_class = JobBinaryInternal + + def create(self, name, data): + return self._update('/job-binary-internals/%s' % name, data, + 'job_binary_internal', dump_json=False) + + def list(self): + return self._list('/job-binary-internals', 'binaries') + + def get(self, job_binary_id): + return self._get('/job-binary-internals/%s' % job_binary_id, + 'job_binary_internal') + + def delete(self, job_binary_id): + self._delete('/job-binary-internals/%s' % job_binary_id) diff --git a/saharaclient/api/job_executions.py b/saharaclient/api/job_executions.py new file mode 100644 index 0000000..68ef4e6 --- /dev/null +++ b/saharaclient/api/job_executions.py @@ -0,0 +1,50 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class JobExecution(base.Resource): + resource_name = 'JobExecution' + + +class JobExecutionsManager(base.ResourceManager): + resource_class = JobExecution + + def list(self): + return self._list('/job-executions', 'job_executions') + + def get(self, obj_id): + return self._get('/job-executions/%s' % obj_id, 'job_execution') + + def delete(self, obj_id): + self._delete('/job-executions/%s' % obj_id) + + def create(self, job_id, cluster_id, input_id, output_id, configs): + url = "/jobs/%s/execute" % job_id + data = { + "cluster_id": cluster_id, + "job_configs": configs + } + + # Leave these out if they are null. For Java job types they + # are not part of the schema + io_ids = (("input_id", input_id), + ("output_id", output_id)) + for key, value in io_ids: + if value is not None: + data.update({key: value}) + + return self._create(url, data, 'job_execution') diff --git a/saharaclient/api/jobs.py b/saharaclient/api/jobs.py new file mode 100644 index 0000000..d3231eb --- /dev/null +++ b/saharaclient/api/jobs.py @@ -0,0 +1,47 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class Job(base.Resource): + resource_name = 'Job' + + +class JobsManager(base.ResourceManager): + resource_class = Job + + def create(self, name, type, mains, libs, description): + data = { + 'name': name, + 'type': type, + 'description': description, + 'mains': mains, + 'libs': libs + } + + return self._create('/jobs', data, 'job') + + def list(self): + return self._list('/jobs', 'jobs') + + def get(self, job_id): + return self._get('/jobs/%s' % job_id, 'job') + + def get_configs(self, job_type): + return self._get('/jobs/config-hints/%s' % job_type) + + def delete(self, job_id): + self._delete('/jobs/%s' % job_id) diff --git a/saharaclient/api/node_group_templates.py b/saharaclient/api/node_group_templates.py new file mode 100644 index 0000000..f9ae36a --- /dev/null +++ b/saharaclient/api/node_group_templates.py @@ -0,0 +1,84 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from saharaclient.api import base + + +class NodeGroupTemplate(base.Resource): + resource_name = 'Node Group Template' + + +class NodeGroupTemplateManager(base.ResourceManager): + resource_class = NodeGroupTemplate + + def _assign_field(self, name, plugin_name, hadoop_version, flavor_id, + description=None, + volumes_per_node=None, volumes_size=None, + node_processes=None, node_configs=None, + floating_ip_pool=None): + + data = { + 'name': name, + 'plugin_name': plugin_name, + 'hadoop_version': hadoop_version, + 'flavor_id': flavor_id, + 'node_processes': node_processes + } + + self._copy_if_defined(data, + description=description, + node_configs=node_configs, + floating_ip_pool=floating_ip_pool) + + if volumes_per_node: + data.update({"volumes_per_node": volumes_per_node, + "volumes_size": volumes_size}) + + return data + + def create(self, name, plugin_name, hadoop_version, flavor_id, + description=None, volumes_per_node=None, volumes_size=None, + node_processes=None, node_configs=None, floating_ip_pool=None): + + data = self._assign_field(name, plugin_name, hadoop_version, flavor_id, + description, volumes_per_node, volumes_size, + node_processes, node_configs, + floating_ip_pool) + + return self._create('/node-group-templates', data, + 'node_group_template') + + def update(self, ng_template_id, name, plugin_name, hadoop_version, + flavor_id, description=None, volumes_per_node=None, + volumes_size=None, node_processes=None, + node_configs=None, floating_ip_pool=None): + + data = self._assign_field(name, plugin_name, hadoop_version, flavor_id, + description, volumes_per_node, + volumes_size, node_processes, + node_configs, floating_ip_pool) + + return self._update('/node-group-templates/%s' % ng_template_id, data, + 'node_group_template') + + def list(self): + return self._list('/node-group-templates', 'node_group_templates') + + def get(self, ng_template_id): + return self._get('/node-group-templates/%s' % ng_template_id, + 'node_group_template') + + def delete(self, ng_template_id): + self._delete('/node-group-templates/%s' % ng_template_id) diff --git a/saharaclient/api/parameters.py b/saharaclient/api/parameters.py new file mode 100644 index 0000000..4575b56 --- /dev/null +++ b/saharaclient/api/parameters.py @@ -0,0 +1,26 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Parameter(object): + """This bean is used for building config entries.""" + def __init__(self, config): + self.name = config['name'] + self.description = config.get('description', "No description") + self.required = not config['is_optional'] + self.default_value = config.get('default_value', None) + self.initial_value = self.default_value + self.param_type = config['config_type'] + self.priority = int(config.get('priority', 2)) diff --git a/saharaclient/api/plugins.py b/saharaclient/api/plugins.py new file mode 100644 index 0000000..967da39 --- /dev/null +++ b/saharaclient/api/plugins.py @@ -0,0 +1,56 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from six.moves.urllib import parse as urlparse + +from saharaclient.api import base + + +class Plugin(base.Resource): + resource_name = 'Plugin' + + def __init__(self, manager, info): + base.Resource.__init__(self, manager, info) + + # Horizon requires each object in table to have an id + self.id = self.name + + +class PluginManager(base.ResourceManager): + resource_class = Plugin + + def list(self): + return self._list('/plugins', 'plugins') + + def get(self, plugin_name): + return self._get('/plugins/%s' % plugin_name, 'plugin') + + def get_version_details(self, plugin_name, hadoop_version): + return self._get('/plugins/%s/%s' % (plugin_name, hadoop_version), + 'plugin') + + def convert_to_cluster_template(self, plugin_name, hadoop_version, + template_name, filecontent): + resp = self.api.client.post('/plugins/%s/%s/convert-config/%s' % + (plugin_name, + hadoop_version, + urlparse.quote(template_name)), + filecontent) + if resp.status_code != 202: + raise RuntimeError('Failed to upload template file for plugin "%s"' + ' and version "%s"' % + (plugin_name, hadoop_version)) + else: + return base.get_json(resp)['cluster_template'] diff --git a/saharaclient/api/shell.py b/saharaclient/api/shell.py new file mode 100644 index 0000000..20f2269 --- /dev/null +++ b/saharaclient/api/shell.py @@ -0,0 +1,778 @@ +# Copyright 2013 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import datetime +import inspect +import json +from saharaclient.nova import utils +from saharaclient.openstack.common.apiclient import exceptions +import sys + + +def _print_list_field(field): + return lambda obj: ', '.join(getattr(obj, field)) + + +def _print_node_group_field(cluster): + return ', '.join(map(lambda x: ': '.join(x), + [(node_group['name'], + str(node_group['count'])) + for node_group in cluster.node_groups])) + + +def _show_node_group_template(template): + template._info['node_processes'] = ( + ', '.join(template._info['node_processes']) + ) + utils.print_dict(template._info) + + +def _show_cluster_template(template): + template._info['node_groups'] = _print_node_group_field(template) + utils.print_dict(template._info) + + +def _show_cluster(cluster): + # TODO(mattf): Make this pretty, e.g format node_groups and info urls + # Forcing wrap=47 allows for clean display on a terminal of width 80 + utils.print_dict(cluster._info, wrap=47) + + +def _show_job_binary_data(data): + columns = ('id', 'name') + utils.print_list(data, columns) + + +def _show_data_source(source): + # TODO(mattf): why are we passing credentials around like this? + if 'credentials' in source._info: + del source._info['credentials'] + utils.print_dict(source._info) + + +def _show_job_binary(binary): + # TODO(mattf): why are we passing credentials around like this? + if 'extra' in binary._info: + del binary._info['extra'] + utils.print_dict(binary._info) + + +def _show_job_template(template): + # TODO(mattf): Make "mains" property pretty + # TODO(mattf): handle/remove "extra" creds + utils.print_dict(template._info) + + +def _show_job(job): + # TODO(mattf): make display of info pretty, until then + # extract the important status information + job._info['status'] = job._info['info']['status'] + del job._info['info'] + utils.print_dict(job._info) + + +def _get_by_id_or_name(manager, id=None, name=None): + if not (name or id): + raise exceptions.CommandError("either NAME or ID is required") + if id: + return manager.get(id) + ls = manager.find(name=name) + if len(ls) == 0: + raise exceptions.CommandError("%s '%s' not found" % + (manager.resource_class.resource_name, + name)) + elif len(ls) > 1: + raise exceptions.CommandError("%s '%s' not unique, try by ID" % + (manager.resource_class.resource_name, + name)) + return ls[0] + + +# +# Plugins +# ~~~~~~~ +# plugin-list +# +# plugin-show --name <plugin> [--version <version>] +# + +def do_plugin_list(cs, args): + """Print a list of available plugins.""" + plugins = cs.plugins.list() + columns = ('name', 'versions', 'title') + utils.print_list(plugins, columns, + {'versions': _print_list_field('versions')}) + + +@utils.arg('--name', + metavar='<plugin>', + required=True, + help='Name of the plugin.') +# TODO(mattf) - saharaclient does not support query w/ version +#@utils.arg('--version', +# metavar='<version>', +# help='Optional version') +def do_plugin_show(cs, args): + """Show details of a plugin.""" + plugin = cs.plugins.get(args.name) + plugin._info['versions'] = ', '.join(plugin._info['versions']) + utils.print_dict(plugin._info) + + +# +# Image Registry +# ~~~~~~~~~~~~~~ +# image-list [--tag <tag>]* +# +# image-show --name <image>|--id <image_id> +# +# image-register --name <image>|--id <image_id> +# [--username <name>] [--description <desc>] +# +# image-unregister --name <image>|--id <image_id> +# +# image-add-tag --name <image>|--id <image_id> --tag <tag>+ +# +# image-remove-tag --name <image>|--id <image_id> --tag <tag>+ +# + +# TODO(mattf): [--tag <tag>]* +def do_image_list(cs, args): + """Print a list of available images.""" + images = cs.images.list() + columns = ('name', 'id', 'username', 'tags', 'description') + utils.print_list(images, columns, {'tags': _print_list_field('tags')}) + + +@utils.arg('--name', + help='Name of the image.') +@utils.arg('--id', + metavar='<image_id>', + help='ID of the image.') +def do_image_show(cs, args): + """Show details of an image.""" + image = _get_by_id_or_name(cs.images, args.id, args.name) + del image._info['metadata'] + image._info['tags'] = ', '.join(image._info['tags']) + utils.print_dict(image._info) + + +# TODO(mattf): Add --name, must lookup in glance index +@utils.arg('--id', + metavar='<image_id>', + required=True, + help='ID of image, run "glance image-list" to see all IDs.') +@utils.arg('--username', + default='root', + metavar='<name>', + help='Username of privileged user in the image.') +@utils.arg('--description', + default='', + metavar='<desc>', + help='Description of the image.') +def do_image_register(cs, args): + """Register an image from the Image index.""" + # TODO(mattf): image register should not be through update + cs.images.update_image(args.id, args.username, args.description) + # TODO(mattf): No indication of result, expect image details + + +@utils.arg('--name', + help='Name of the image.') +@utils.arg('--id', + metavar='<image_id>', + help='ID of image to unregister.') +def do_image_unregister(cs, args): + """Unregister an image.""" + cs.images.unregister_image( + args.id or _get_by_id_or_name(cs.images, name=args.name).id + ) + # TODO(mattf): No indication of result, expect result to display + + +@utils.arg('--name', + help='Name of the image.') +@utils.arg('--id', + metavar='<image_id>', + help='ID of image to tag.') +# TODO(mattf): Change --tag to --tag+ +@utils.arg('--tag', + metavar='<tag>', + required=True, + help='Tag to add.') +def do_image_add_tag(cs, args): + """Add a tag to an image.""" + # TODO(mattf): Need proper add_tag API call + id = args.id or _get_by_id_or_name(cs.images, name=args.name).id + cs.images.update_tags(id, cs.images.get(id).tags + [args.tag, ]) + # TODO(mattf): No indication of result, expect image details + + +@utils.arg('--name', + help='Name of the image.') +@utils.arg('--id', + metavar='<image_id>', + help='Image to tag.') +# TODO(mattf): Change --tag to --tag+ +@utils.arg('--tag', + metavar='<tag>', + required=True, + help='Tag to remove.') +def do_image_remove_tag(cs, args): + """Remove a tag from an image.""" + # TODO(mattf): Need proper remove_tag API call + id = args.id or _get_by_id_or_name(cs.images, name=args.name).id + cs.images.update_tags(id, + filter(lambda x: x != args.tag, + cs.images.get(id).tags)) + # TODO(mattf): No indication of result, expect image details + + +# +# Clusters +# ~~~~~~~~ +# cluster-list +# +# cluster-show --name <cluster>|--id <cluster_id> [--json] +# +# cluster-create [--json <file>] +# +# TODO(mattf): cluster-scale +# +# cluster-delete --name <cluster>|--id <cluster_id> +# + +def do_cluster_list(cs, args): + """Print a list of available clusters.""" + clusters = cs.clusters.list() + for cluster in clusters: + cluster.node_count = sum(map(lambda g: g['count'], + cluster.node_groups)) + columns = ('name', 'id', 'status', 'node_count') + utils.print_list(clusters, columns) + + +@utils.arg('--name', + help='Name of the cluster.') +@utils.arg('--id', + metavar='<cluster_id>', + help='ID of the cluster to show.') +@utils.arg('--json', + action='store_true', + default=False, + help='Print JSON representation of the cluster.') +def do_cluster_show(cs, args): + """Show details of a cluster.""" + cluster = _get_by_id_or_name(cs.clusters, args.id, args.name) + if args.json: + print(json.dumps(cluster._info)) + else: + _show_cluster(cluster) + + +@utils.arg('--json', + default=sys.stdin, + type=argparse.FileType('r'), + help='JSON representation of cluster.') +def do_cluster_create(cs, args): + """Create a cluster.""" + # TODO(mattf): improve template validation, e.g. template w/o name key + template = json.loads(args.json.read()) + # The neutron_management_network parameter to clusters.create is + # called net_id. Therefore, we must translate before invoking + # create w/ **template. It may be desirable to simple change + # clusters.create in the future. + template['net_id'] = template.get('neutron_management_network', None) + valid_args = inspect.getargspec(cs.clusters.create).args + for name in template.keys(): + if name not in valid_args: + # TODO(mattf): make this verbose - bug/1271147 + del template[name] + _show_cluster(cs.clusters.create(**template)) + + +@utils.arg('--name', + help='Name of the cluster.') +@utils.arg('--id', + metavar='<cluster_id>', + help='ID of the cluster to delete.') +def do_cluster_delete(cs, args): + """Delete a cluster.""" + cs.clusters.delete( + args.id or _get_by_id_or_name(cs.clusters, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Node Group Templates +# ~~~~~~~~~~~~~~~~~~~~ +# node-group-template-list +# +# node-group-template-show --name <template>|--id <template_id> [--json] +# +# node-group-template-create [--json <file>] +# +# node-group-template-delete --name <template>|--id <template_id> +# + +def do_node_group_template_list(cs, args): + """Print a list of available node group templates.""" + templates = cs.node_group_templates.list() + columns = ('name', 'id', 'plugin_name', 'node_processes', 'description') + utils.print_list(templates, columns, + {'node_processes': _print_list_field('node_processes')}) + + +@utils.arg('--name', + help='Name of the node group template.') +@utils.arg('--id', + metavar='<template_id>', + help='ID of the node group template to show.') +@utils.arg('--json', + action='store_true', + default=False, + help='Print JSON representation of node group template.') +def do_node_group_template_show(cs, args): + """Show details of a node group template.""" + template = _get_by_id_or_name(cs.node_group_templates, args.id, args.name) + if args.json: + print(json.dumps(template._info)) + else: + _show_node_group_template(template) + + +@utils.arg('--json', + default=sys.stdin, + type=argparse.FileType('r'), + help='JSON representation of node group template.') +def do_node_group_template_create(cs, args): + """Create a node group template.""" + # TODO(mattf): improve template validation, e.g. template w/o name key + template = json.loads(args.json.read()) + valid_args = inspect.getargspec(cs.node_group_templates.create).args + for name in template.keys(): + if name not in valid_args: + # TODO(mattf): make this verbose - bug/1271147 + del template[name] + _show_node_group_template(cs.node_group_templates.create(**template)) + + +@utils.arg('--name', + help='Name of the node group template.') +@utils.arg('--id', + metavar='<template_id>', + help='ID of the node group template to delete.') +def do_node_group_template_delete(cs, args): + """Delete a node group template.""" + cs.node_group_templates.delete( + args.id or + _get_by_id_or_name(cs.node_group_templates, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Cluster Templates +# ~~~~~~~~~~~~~~~~~ +# cluster-template-list +# +# cluster-template-show --name <template>|--id <template_id> [--json] +# +# cluster-template-create [--json <file>] +# +# cluster-template-delete --name <template>|--id <template_id> +# + +def do_cluster_template_list(cs, args): + """Print a list of available cluster templates.""" + templates = cs.cluster_templates.list() + columns = ('name', 'id', 'plugin_name', 'node_groups', 'description') + utils.print_list(templates, columns, + {'node_groups': _print_node_group_field}) + + +@utils.arg('--name', + help='Name of the cluster template.') +@utils.arg('--id', + metavar='<template_id>', + help='ID of the cluster template to show.') +@utils.arg('--json', + action='store_true', + default=False, + help='Print JSON representation of cluster template.') +def do_cluster_template_show(cs, args): + """Show details of a cluster template.""" + template = _get_by_id_or_name(cs.cluster_templates, args.id, args.name) + if args.json: + print(json.dumps(template._info)) + else: + _show_cluster_template(template) + + +@utils.arg('--json', + default=sys.stdin, + type=argparse.FileType('r'), + help='JSON representation of cluster template.') +def do_cluster_template_create(cs, args): + """Create a cluster template.""" + # TODO(mattf): improve template validation, e.g. template w/o name key + template = json.loads(args.json.read()) + valid_args = inspect.getargspec(cs.cluster_templates.create).args + for name in template.keys(): + if name not in valid_args: + # TODO(mattf): make this verbose - bug/1271147 + del template[name] + _show_cluster_template(cs.cluster_templates.create(**template)) + + +@utils.arg('--name', + help='Name of the cluster template.') +@utils.arg('--id', + metavar='<template_id>', + help='ID of the cluster template to delete.') +def do_cluster_template_delete(cs, args): + """Delete a cluster template.""" + cs.cluster_templates.delete( + args.id or _get_by_id_or_name(cs.cluster_templates, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Data Sources +# ~~~~~~~~~~~~ +# data-source-list +# +# data-source-show --name <name>|--id <id> +# +# data-source-create --name <name> --type <type> +# --url <url> +# [--user <user> --password <password>] +# [--description <desc>] +# NB: user & password if type is swift +# +# data-source-delete --name <name>|--id <id> +# + +def do_data_source_list(cs, args): + """Print a list of available data sources.""" + sources = cs.data_sources.list() + columns = ('name', 'id', 'type', 'description') + utils.print_list(sources, columns) + + +@utils.arg('--name', + help='Name of the data source.') +@utils.arg('--id', + help='ID of the data source.') +def do_data_source_show(cs, args): + """Show details of a data source.""" + _show_data_source(_get_by_id_or_name(cs.data_sources, args.id, args.name)) + + +@utils.arg('--name', + required=True, + help='Name of the data source.') +@utils.arg('--type', + required=True, + help='Type of the data source.') +@utils.arg('--url', + required=True, + help='URL for the data source.') +@utils.arg('--description', + default='', + help='Description of the data source.') +@utils.arg('--user', + default=None, + help='Username for accessing the data source URL.') +@utils.arg('--password', + default=None, + help='Password for accessing the data source URL.') +def do_data_source_create(cs, args): + """Create a data source that provides job input or receives job output.""" + _show_data_source(cs.data_sources.create(args.name, args.description, + args.type, args.url, + args.user, args.password)) + + +@utils.arg('--name', + help='Name of the data source.') +@utils.arg('--id', + help='ID of data source to delete.') +def do_data_source_delete(cs, args): + """Delete a data source.""" + cs.data_sources.delete( + args.id or _get_by_id_or_name(cs.data_sources, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Job Binary Internals +# ~~~~~~~~~~~~~~~~~~~~ +# job-binary-data-list +# +# job-binary-data-create [--file <file>] +# +# job-binary-data-delete --id <id> +# + +def do_job_binary_data_list(cs, args): + """Print a list of internally stored job binary data.""" + _show_job_binary_data(cs.job_binary_internals.list()) + + +@utils.arg('--file', + default=sys.stdin, + type=argparse.FileType('r'), + help='Data to store.') +def do_job_binary_data_create(cs, args): + """Store data in the internal DB. + Use 'swift upload' instead of this command. + Use this command only if Swift is not available. + """ + # Should be %F-%T except for type validation errors + _show_job_binary_data((cs.job_binary_internals.create( + datetime.datetime.now().strftime('d%Y%m%d%H%M%S'), + args.file.read()),) + ) + + +@utils.arg('--id', + required=True, + help='ID of internally stored job binary data.') +def do_job_binary_data_delete(cs, args): + """Delete an internally stored job binary data.""" + cs.job_binary_internals.delete(args.id) + # TODO(mattf): No indication of result + # TODO(mattf): Appears no DB constraints for removing data used by job + + +# +# Job Binaries +# ~~~~~~~~~~~~ +# job-binary-list +# +# job-binary-show --name <name>|--id <id> +# +# job-binary-create --name <name> --url <url> +# [--user <user> --password <password>] +# [--description <desc>] +# +# job-binary-delete --name <name>|--id <id> +# + +def do_job_binary_list(cs, args): + """Print a list of job binaries.""" + binaries = cs.job_binaries.list() + columns = ('id', 'name', 'description') + utils.print_list(binaries, columns) + + +@utils.arg('--name', + help='Name of the job binary.') +@utils.arg('--id', + help='ID of the job binary.') +def do_job_binary_show(cs, args): + """Show details of a job binary.""" + _show_job_binary(_get_by_id_or_name(cs.job_binaries, args.id, args.name)) + + +@utils.arg('--name', + required=True, + help='Name of the job binary.') +@utils.arg('--url', + required=True, + help='URL for the job binary.') +@utils.arg('--description', + default='', + help='Description of the job binary.') +@utils.arg('--user', + default=None, + help='Username for accessing the job binary URL.') +@utils.arg('--password', + default=None, + help='Password for accessing the job binary URL.') +def do_job_binary_create(cs, args): + """Record a job binary.""" + # TODO(mattf): make credentials consistent w/ data source + extra = {} + if args.user: + extra['user'] = args.user + if args.password: + extra['password'] = args.password + _show_job_binary(cs.job_binaries.create(args.name, args.url, + args.description, extra)) + + +@utils.arg('--name', + help='Name of the job binary.') +@utils.arg('--id', + help='ID of the job binary to delete.') +def do_job_binary_delete(cs, args): + """Delete a job binary.""" + cs.job_binaries.delete( + args.id or _get_by_id_or_name(cs.job_binaries, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Jobs +# ~~~~ +# job-template-list +# +# job-template-show --name <name>|--id <id> +# +# job-template-create --name <name> +# --type <Pig|Hive|MapReduce|Java|...> +# [--mains <array of string>] +# [--libs <array of string>] +# [--description <desc>] +# +# job-template-delete --name <name>|--id <id> +# + +def do_job_template_list(cs, args): + """Print a list of job templates.""" + templates = cs.jobs.list() + columns = ('id', 'name', 'description') + utils.print_list(templates, columns) + + +@utils.arg('--name', + help='Name of the job template.') +@utils.arg('--id', + help='ID of the job template.') +def do_job_template_show(cs, args): + """Show details of a job template.""" + _show_job_template(_get_by_id_or_name(cs.jobs, args.id, args.name)) + + +@utils.arg('--name', + required=True, + help='Name of the job template.') +@utils.arg('--type', + required=True, + help='Type of the job template.') +@utils.arg('--main', + action='append', + default=[], + help='ID for job\'s main job-binary.') +@utils.arg('--lib', + action='append', + default=[], + help='ID of job\'s lib job-binary, repeatable.') +@utils.arg('--description', + default='', + help='Description of the job template.') +def do_job_template_create(cs, args): + """Create a job template.""" + _show_job_template(cs.jobs.create(args.name, args.type, + args.main, args.lib, + args.description)) + + +@utils.arg('--name', + help='Name of the job template.') +@utils.arg('--id', + help='ID of the job template.') +def do_job_template_delete(cs, args): + """Delete a job template.""" + cs.jobs.delete( + args.id or _get_by_id_or_name(cs.jobs, name=args.name).id + ) + # TODO(mattf): No indication of result + + +# +# Job Executions +# ~~~~~~~~~~~~~~ +# job-list +# +# job-show --id <id> +# +# job-create --job-template <id> --cluster <id> +# [--input-data <id>] [--output-data <id>] +# [--param <name=value>] +# [--arg <arg>] +# [--config <name=value>] +# +# job-delete --id <id> +# + +def do_job_list(cs, args): + """Print a list of jobs.""" + jobs = cs.job_executions.list() + for job in jobs: + # why is status in info.status? + job.status = job.info['status'] + # TODO(mattf): why can cluster_id be None? + columns = ('id', 'cluster_id', 'status') + utils.print_list(jobs, columns) + + +@utils.arg('--id', + required=True, + help='ID of the job.') +def do_job_show(cs, args): + """Show details of a job.""" + _show_job(cs.job_executions.get(args.id)) + + +@utils.arg('--job-template', + required=True, + help='ID of the job template to run.') +@utils.arg('--cluster', + required=True, + help='ID of the cluster to run the job in.') +@utils.arg('--input-data', + default=None, + help='ID of the input data source.') +@utils.arg('--output-data', + default=None, + help='ID of the output data source.') +@utils.arg('--param', + metavar='name=value', + action='append', + default=[], + help='Parameters to add to the job, repeatable.') +@utils.arg('--arg', + action='append', + default=[], + help='Arguments to add to the job, repeatable.') +@utils.arg('--config', + metavar='name=value', + action='append', + default=[], + help='Config parameters to add to the job, repeatable.') +def do_job_create(cs, args): + _convert = lambda ls: dict(map(lambda i: i.split('=', 1), ls)) + _show_job(cs.job_executions.create(args.job_template, args.cluster, + args.input_data, args.output_data, + {'params': _convert(args.param), + 'args': args.arg, + 'configs': _convert(args.config)})) + + +@utils.arg('--id', + required=True, + help='ID of a job.') +def do_job_delete(cs, args): + """Delete a job.""" + cs.job_executions.delete(args.id) + # TODO(mattf): No indication of result |