diff options
-rw-r--r-- | saharaclient/osc/v1/jobs.py | 368 | ||||
-rw-r--r-- | saharaclient/tests/unit/osc/v1/test_jobs.py | 323 | ||||
-rw-r--r-- | setup.cfg | 6 |
3 files changed, 697 insertions, 0 deletions
diff --git a/saharaclient/osc/v1/jobs.py b/saharaclient/osc/v1/jobs.py new file mode 100644 index 0000000..7c836ae --- /dev/null +++ b/saharaclient/osc/v1/jobs.py @@ -0,0 +1,368 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from cliff import command +from cliff import lister +from cliff import show +from openstackclient.common import exceptions +from openstackclient.common import utils as osc_utils +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from saharaclient.osc.v1 import utils + +JOB_FIELDS = ['id', 'job_template_id', 'cluster_id', 'input_id', 'output_id', + 'start_time', 'end_time', 'status', 'is_public', 'is_protected', + 'engine_job_id'] + +JOB_STATUS_CHOICES = ['done-with-error', 'failed', 'killed', 'pending', + 'running', 'succeeded', 'to-be-killed'] + + +def _format_job_output(data): + data['status'] = data['info']['status'] + del data['info'] + data['job_template_id'] = data.pop('job_id') + + +class ExecuteJob(show.ShowOne): + """Executes job""" + + log = logging.getLogger(__name__ + ".ExecuteJob") + + def get_parser(self, prog_name): + parser = super(ExecuteJob, self).get_parser(prog_name) + + parser.add_argument( + '--job-template', + metavar="<job-template>", + help="Name or ID of the job template " + "[REQUIRED if JSON is not provided]", + ) + parser.add_argument( + '--cluster', + metavar="<cluster>", + help="Name or ID of the cluster " + "[REQUIRED if JSON is not provided]", + ) + parser.add_argument( + '--input', + metavar="<input>", + help="Name or ID of the input data source", + ) + parser.add_argument( + '--output', + metavar="<output>", + help="Name or ID of the output data source", + ) + parser.add_argument( + '--params', + metavar="<name:value>", + nargs='+', + help="Parameters to add to the job" + ) + parser.add_argument( + '--args', + metavar="<argument>", + nargs='+', + help="Arguments to add to the job" + ) + parser.add_argument( + '--public', + action='store_true', + default=False, + help='Make the job public', + ) + parser.add_argument( + '--protected', + action='store_true', + default=False, + help='Make the job protected', + ) + configs = parser.add_mutually_exclusive_group() + configs.add_argument( + '--config-json', + metavar='<filename>', + help='JSON representation of the job configs' + ) + configs.add_argument( + '--configs', + metavar="<name:value>", + nargs='+', + help="Configs to add to the job" + ) + parser.add_argument( + '--interface', + metavar='<filename>', + help='JSON representation of the interface' + ) + parser.add_argument( + '--json', + metavar='<filename>', + help='JSON representation of the job. Other arguments will not be ' + 'taken into account if this one is provided' + ) + return parser + + def take_action(self, parsed_args): + self.log.debug("take_action(%s)" % parsed_args) + client = self.app.client_manager.data_processing + + if parsed_args.json: + blob = osc_utils.read_blob_file_contents(parsed_args.json) + try: + template = jsonutils.loads(blob) + except ValueError as e: + raise exceptions.CommandError( + 'An error occurred when reading ' + 'template from file %s: %s' % (parsed_args.json, e)) + + if 'job_configs' in template: + template['configs'] = template.pop('job_configs') + + data = client.job_executions.create(**template).to_dict() + else: + if not parsed_args.cluster or not parsed_args.job_template: + raise exceptions.CommandError( + 'At least --cluster, --job-template, arguments should be ' + 'specified or json template should be provided with ' + '--json argument') + + job_configs = {} + + if parsed_args.interface: + blob = osc_utils.read_blob_file_contents(parsed_args.json) + try: + parsed_args.interface = jsonutils.loads(blob) + except ValueError as e: + raise exceptions.CommandError( + 'An error occurred when reading ' + 'interface from file %s: %s' % (parsed_args.json, e)) + + if parsed_args.config_json: + blob = osc_utils.read_blob_file_contents(parsed_args.configs) + try: + job_configs['configs'] = jsonutils.loads(blob) + except ValueError as e: + raise exceptions.CommandError( + 'An error occurred when reading ' + 'configs from file %s: %s' % (parsed_args.json, e)) + elif parsed_args.configs: + job_configs['configs'] = dict( + map(lambda x: x.split(':', 1), parsed_args.configs)) + + if parsed_args.args: + job_configs['args'] = parsed_args.args + + if parsed_args.params: + job_configs['params'] = dict( + map(lambda x: x.split(':', 1), parsed_args.params)) + + jt_id = utils.get_resource( + client.jobs, parsed_args.job_template).id + cluster_id = utils.get_resource( + client.clusters, parsed_args.cluster).id + + input_id = utils.get_resource( + client.data_sources, parsed_args.input).id + output_id = utils.get_resource( + client.data_sources, parsed_args.output).id + + data = client.job_executions.create( + job_id=jt_id, cluster_id=cluster_id, input_id=input_id, + output_id=output_id, interface=parsed_args.interface, + configs=job_configs, is_public=parsed_args.public, + is_protected=parsed_args.protected).to_dict() + + _format_job_output(data) + data = utils.prepare_data(data, JOB_FIELDS) + + return self.dict2columns(data) + + +class ListJobs(lister.Lister): + """Lists jobs""" + + log = logging.getLogger(__name__ + ".ListJobs") + + def get_parser(self, prog_name): + parser = super(ListJobs, self).get_parser(prog_name) + parser.add_argument( + '--long', + action='store_true', + default=False, + help='List additional fields in output', + ) + parser.add_argument( + '--status', + metavar="<status>", + choices=JOB_STATUS_CHOICES, + help="List jobs with specific status" + ) + + return parser + + def take_action(self, parsed_args): + self.log.debug("take_action(%s)" % parsed_args) + client = self.app.client_manager.data_processing + + data = client.job_executions.list() + for job in data: + job.status = job.info['status'] + + if parsed_args.status: + data = [job for job in data + if job.info['status'] == parsed_args.status.replace( + '-', '').upper()] + + if parsed_args.long: + columns = ('id', 'cluster id', 'job id', 'status', 'start time', + 'end time') + column_headers = utils.prepare_column_headers(columns) + + else: + columns = ('id', 'cluster id', 'job id', 'status') + column_headers = utils.prepare_column_headers(columns) + + return ( + column_headers, + (osc_utils.get_item_properties( + s, + columns + ) for s in data) + ) + + +class ShowJob(show.ShowOne): + """Display job details""" + + log = logging.getLogger(__name__ + ".ShowJob") + + def get_parser(self, prog_name): + parser = super(ShowJob, self).get_parser(prog_name) + parser.add_argument( + "job", + metavar="<job>", + help="ID of the job to display", + ) + + return parser + + def take_action(self, parsed_args): + self.log.debug("take_action(%s)" % parsed_args) + client = self.app.client_manager.data_processing + + data = client.job_executions.get(parsed_args.job).to_dict() + + _format_job_output(data) + data = utils.prepare_data(data, JOB_FIELDS) + + return self.dict2columns(data) + + +class DeleteJob(command.Command): + """Deletes job""" + + log = logging.getLogger(__name__ + ".DeleteJob") + + def get_parser(self, prog_name): + parser = super(DeleteJob, self).get_parser(prog_name) + parser.add_argument( + "job", + metavar="<job>", + nargs="+", + help="ID(s) of the job(s) to delete", + ) + parser.add_argument( + '--wait', + action='store_true', + default=False, + help='Wait for the job(s) delete to complete', + ) + + return parser + + def take_action(self, parsed_args): + self.log.debug("take_action(%s)" % parsed_args) + client = self.app.client_manager.data_processing + for job_id in parsed_args.job: + client.job_executions.delete(job_id) + + if parsed_args.wait: + for job_id in parsed_args.job: + if not utils.wait_for_delete(client.job_executions, job_id): + self.log.error( + 'Error occurred during job deleting: %s', + job_id) + + +class UpdateJob(show.ShowOne): + """Updates job""" + + log = logging.getLogger(__name__ + ".UpdateJob") + + def get_parser(self, prog_name): + parser = super(UpdateJob, self).get_parser(prog_name) + + parser.add_argument( + 'job', + metavar="<job>", + help="ID of the job to update", + ) + public = parser.add_mutually_exclusive_group() + public.add_argument( + '--public', + action='store_true', + help='Make the job public (Visible from other tenants)', + dest='is_public' + ) + public.add_argument( + '--private', + action='store_false', + help='Make the job private (Visible only from this tenant)', + dest='is_public' + ) + protected = parser.add_mutually_exclusive_group() + protected.add_argument( + '--protected', + action='store_true', + help='Make the job protected', + dest='is_protected' + ) + protected.add_argument( + '--unprotected', + action='store_false', + help='Make the job unprotected', + dest='is_protected' + ) + + parser.set_defaults(is_public=None, is_protected=None) + + return parser + + def take_action(self, parsed_args): + self.log.debug("take_action(%s)" % parsed_args) + client = self.app.client_manager.data_processing + + data = client.job_executions.update( + parsed_args.job, + is_public=parsed_args.is_public, + is_protected=parsed_args.is_protected + ).job_execution + + _format_job_output(data) + data = utils.prepare_data(data, JOB_FIELDS) + + return self.dict2columns(data) diff --git a/saharaclient/tests/unit/osc/v1/test_jobs.py b/saharaclient/tests/unit/osc/v1/test_jobs.py new file mode 100644 index 0000000..f907a8e --- /dev/null +++ b/saharaclient/tests/unit/osc/v1/test_jobs.py @@ -0,0 +1,323 @@ +# Copyright (c) 2015 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from openstackclient.tests import utils as osc_utils + +from saharaclient.api import job_executions as api_je +from saharaclient.osc.v1 import jobs as osc_je +from saharaclient.tests.unit.osc.v1 import fakes + +JOB_EXECUTION_INFO = { + "is_public": False, + "id": "je_id", + "interface": [], + "is_protected": False, + "input_id": 'input_id', + "output_id": 'output_id', + "job_id": "job_id", + "cluster_id": 'cluster_id', + "start_time": "start", + "end_time": "end", + "engine_job_id": "engine_job_id", + "info": { + "status": 'SUCCEEDED' + }, + "job_configs": { + "configs": { + "config1": "1", + "config2": "2" + }, + "args": [ + "arg1", + "arg2" + ], + "params": { + "param2": "value2", + "param1": "value1" + } + } +} + + +class TestJobs(fakes.TestDataProcessing): + def setUp(self): + super(TestJobs, self).setUp() + self.je_mock = self.app.client_manager.data_processing.job_executions + self.je_mock.reset_mock() + + +class TestExecuteJob(TestJobs): + # TODO(apavlov): check for execution with --interface, --configs, --json + def setUp(self): + super(TestExecuteJob, self).setUp() + self.je_mock.create.return_value = api_je.JobExecution( + None, JOB_EXECUTION_INFO) + self.ds_mock = self.app.client_manager.data_processing.data_sources + self.ds_mock.find_unique.return_value = mock.Mock(id='ds_id') + self.c_mock = self.app.client_manager.data_processing.clusters + self.c_mock.find_unique.return_value = mock.Mock(id='cluster_id') + self.jt_mock = self.app.client_manager.data_processing.jobs + self.jt_mock.find_unique.return_value = mock.Mock(id='job_id') + self.ds_mock.reset_mock() + self.c_mock.reset_mock() + self.jt_mock.reset_mock() + + # Command to test + self.cmd = osc_je.ExecuteJob(self.app, None) + + def test_job_execute_minimum_options(self): + arglist = ['--job-template', 'job-template', '--cluster', 'cluster'] + verifylist = [('job_template', 'job-template'), ('cluster', 'cluster')] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.create.assert_called_once_with( + cluster_id='cluster_id', configs={}, input_id='ds_id', + interface=None, is_protected=False, is_public=False, + job_id='job_id', output_id='ds_id') + + def test_job_execute_all_options(self): + arglist = ['--job-template', 'job-template', '--cluster', 'cluster', + '--input', 'input', '--output', 'output', '--params', + 'param1:value1', 'param2:value2', '--args', 'arg1', 'arg2', + '--configs', 'config1:1', 'config2:2', '--public', + '--protected'] + + verifylist = [('job_template', 'job-template'), ('cluster', 'cluster'), + ('input', 'input'), ('output', 'output'), + ('params', ['param1:value1', 'param2:value2']), + ('args', ['arg1', 'arg2']), + ('configs', ['config1:1', 'config2:2']), + ('public', True), + ('protected', True)] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.create.assert_called_once_with( + cluster_id='cluster_id', + configs={'configs': {'config1': '1', 'config2': '2'}, + 'args': ['arg1', 'arg2'], + 'params': {'param2': 'value2', 'param1': 'value1'}}, + input_id='ds_id', interface=None, is_protected=True, + is_public=True, job_id='job_id', output_id='ds_id') + + # Check that columns are correct + expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id', + 'Input id', 'Is protected', 'Is public', + 'Job template id', 'Output id', 'Start time', + 'Status') + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id', + 'input_id', False, False, 'job_id', 'output_id', + 'start', 'SUCCEEDED') + self.assertEqual(expected_data, data) + + +class TestListJobs(TestJobs): + def setUp(self): + super(TestListJobs, self).setUp() + self.je_mock.list.return_value = [api_je.JobExecution( + None, JOB_EXECUTION_INFO)] + + # Command to test + self.cmd = osc_je.ListJobs(self.app, None) + + def test_jobs_list_no_options(self): + arglist = [] + verifylist = [] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that columns are correct + expected_columns = ['Id', 'Cluster id', 'Job id', 'Status'] + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')] + self.assertEqual(expected_data, list(data)) + + def test_jobs_list_long(self): + arglist = ['--long'] + verifylist = [('long', True)] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that columns are correct + expected_columns = ['Id', 'Cluster id', 'Job id', 'Status', + 'Start time', 'End time'] + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED', + 'start', 'end')] + self.assertEqual(expected_data, list(data)) + + def test_jobs_list_extra_search_opts(self): + arglist = ['--status', 'succeeded'] + verifylist = [('status', 'succeeded')] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that columns are correct + expected_columns = ['Id', 'Cluster id', 'Job id', 'Status'] + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')] + self.assertEqual(expected_data, list(data)) + + +class TestShowJob(TestJobs): + def setUp(self): + super(TestShowJob, self).setUp() + self.je_mock.get.return_value = api_je.JobExecution( + None, JOB_EXECUTION_INFO) + + # Command to test + self.cmd = osc_je.ShowJob(self.app, None) + + def test_job_show(self): + arglist = ['job_id'] + verifylist = [('job', 'job_id')] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.get.assert_called_once_with('job_id') + + # Check that columns are correct + expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id', + 'Input id', 'Is protected', 'Is public', + 'Job template id', 'Output id', 'Start time', + 'Status') + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id', + 'input_id', False, False, 'job_id', 'output_id', + 'start', 'SUCCEEDED') + self.assertEqual(expected_data, data) + + +class TestDeleteJob(TestJobs): + def setUp(self): + super(TestDeleteJob, self).setUp() + self.je_mock.get.return_value = api_je.JobExecution( + None, JOB_EXECUTION_INFO) + + # Command to test + self.cmd = osc_je.DeleteJob(self.app, None) + + def test_job_delete(self): + arglist = ['job_id'] + verifylist = [('job', ['job_id'])] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.delete.assert_called_once_with('job_id') + + +class TestUpdateJob(TestJobs): + def setUp(self): + super(TestUpdateJob, self).setUp() + self.je_mock.get.return_value = api_je.JobExecution( + None, JOB_EXECUTION_INFO) + self.je_mock.update.return_value = mock.Mock( + job_execution=JOB_EXECUTION_INFO.copy()) + + # Command to test + self.cmd = osc_je.UpdateJob(self.app, None) + + def test_job_update_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(osc_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_job_update_nothing_updated(self): + arglist = ['job_id'] + + verifylist = [('job', 'job_id')] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.update.assert_called_once_with( + 'job_id', is_protected=None, is_public=None) + + def test_job_update_public_protected(self): + arglist = ['job_id', '--public', '--protected'] + + verifylist = [('job', 'job_id'), ('is_public', True), + ('is_protected', True)] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.update.assert_called_once_with( + 'job_id', is_protected=True, is_public=True) + + # Check that columns are correct + expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id', + 'Input id', 'Is protected', 'Is public', + 'Job template id', 'Output id', 'Start time', + 'Status') + self.assertEqual(expected_columns, columns) + + # Check that data is correct + expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id', + 'input_id', False, False, 'job_id', 'output_id', + 'start', 'SUCCEEDED') + self.assertEqual(expected_data, data) + + def test_job_update_private_unprotected(self): + arglist = ['job_id', '--private', '--unprotected'] + + verifylist = [('job', 'job_id'), ('is_public', False), + ('is_protected', False)] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + self.cmd.take_action(parsed_args) + + # Check that correct arguments were passed + self.je_mock.update.assert_called_once_with( + 'job_id', is_protected=False, is_public=False) @@ -82,6 +82,12 @@ openstack.data_processing.v1 = dataprocessing_job_type_list = saharaclient.osc.v1.job_types:ListJobTypes dataprocessing_job_type_configs_get = saharaclient.osc.v1.job_types:GetJobTypeConfigs + dataprocessing_job_execute = saharaclient.osc.v1.jobs:ExecuteJob + dataprocessing_job_list = saharaclient.osc.v1.jobs:ListJobs + dataprocessing_job_show = saharaclient.osc.v1.jobs:ShowJob + dataprocessing_job_update = saharaclient.osc.v1.jobs:UpdateJob + dataprocessing_job_delete = saharaclient.osc.v1.jobs:DeleteJob + [build_sphinx] all_files = 1 build-dir = doc/build |