summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--saharaclient/osc/v1/jobs.py368
-rw-r--r--saharaclient/tests/unit/osc/v1/test_jobs.py323
-rw-r--r--setup.cfg6
3 files changed, 697 insertions, 0 deletions
diff --git a/saharaclient/osc/v1/jobs.py b/saharaclient/osc/v1/jobs.py
new file mode 100644
index 0000000..7c836ae
--- /dev/null
+++ b/saharaclient/osc/v1/jobs.py
@@ -0,0 +1,368 @@
+# Copyright (c) 2015 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cliff import command
+from cliff import lister
+from cliff import show
+from openstackclient.common import exceptions
+from openstackclient.common import utils as osc_utils
+from oslo_log import log as logging
+from oslo_serialization import jsonutils
+
+from saharaclient.osc.v1 import utils
+
+JOB_FIELDS = ['id', 'job_template_id', 'cluster_id', 'input_id', 'output_id',
+ 'start_time', 'end_time', 'status', 'is_public', 'is_protected',
+ 'engine_job_id']
+
+JOB_STATUS_CHOICES = ['done-with-error', 'failed', 'killed', 'pending',
+ 'running', 'succeeded', 'to-be-killed']
+
+
+def _format_job_output(data):
+ data['status'] = data['info']['status']
+ del data['info']
+ data['job_template_id'] = data.pop('job_id')
+
+
+class ExecuteJob(show.ShowOne):
+ """Executes job"""
+
+ log = logging.getLogger(__name__ + ".ExecuteJob")
+
+ def get_parser(self, prog_name):
+ parser = super(ExecuteJob, self).get_parser(prog_name)
+
+ parser.add_argument(
+ '--job-template',
+ metavar="<job-template>",
+ help="Name or ID of the job template "
+ "[REQUIRED if JSON is not provided]",
+ )
+ parser.add_argument(
+ '--cluster',
+ metavar="<cluster>",
+ help="Name or ID of the cluster "
+ "[REQUIRED if JSON is not provided]",
+ )
+ parser.add_argument(
+ '--input',
+ metavar="<input>",
+ help="Name or ID of the input data source",
+ )
+ parser.add_argument(
+ '--output',
+ metavar="<output>",
+ help="Name or ID of the output data source",
+ )
+ parser.add_argument(
+ '--params',
+ metavar="<name:value>",
+ nargs='+',
+ help="Parameters to add to the job"
+ )
+ parser.add_argument(
+ '--args',
+ metavar="<argument>",
+ nargs='+',
+ help="Arguments to add to the job"
+ )
+ parser.add_argument(
+ '--public',
+ action='store_true',
+ default=False,
+ help='Make the job public',
+ )
+ parser.add_argument(
+ '--protected',
+ action='store_true',
+ default=False,
+ help='Make the job protected',
+ )
+ configs = parser.add_mutually_exclusive_group()
+ configs.add_argument(
+ '--config-json',
+ metavar='<filename>',
+ help='JSON representation of the job configs'
+ )
+ configs.add_argument(
+ '--configs',
+ metavar="<name:value>",
+ nargs='+',
+ help="Configs to add to the job"
+ )
+ parser.add_argument(
+ '--interface',
+ metavar='<filename>',
+ help='JSON representation of the interface'
+ )
+ parser.add_argument(
+ '--json',
+ metavar='<filename>',
+ help='JSON representation of the job. Other arguments will not be '
+ 'taken into account if this one is provided'
+ )
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug("take_action(%s)" % parsed_args)
+ client = self.app.client_manager.data_processing
+
+ if parsed_args.json:
+ blob = osc_utils.read_blob_file_contents(parsed_args.json)
+ try:
+ template = jsonutils.loads(blob)
+ except ValueError as e:
+ raise exceptions.CommandError(
+ 'An error occurred when reading '
+ 'template from file %s: %s' % (parsed_args.json, e))
+
+ if 'job_configs' in template:
+ template['configs'] = template.pop('job_configs')
+
+ data = client.job_executions.create(**template).to_dict()
+ else:
+ if not parsed_args.cluster or not parsed_args.job_template:
+ raise exceptions.CommandError(
+ 'At least --cluster, --job-template, arguments should be '
+ 'specified or json template should be provided with '
+ '--json argument')
+
+ job_configs = {}
+
+ if parsed_args.interface:
+ blob = osc_utils.read_blob_file_contents(parsed_args.json)
+ try:
+ parsed_args.interface = jsonutils.loads(blob)
+ except ValueError as e:
+ raise exceptions.CommandError(
+ 'An error occurred when reading '
+ 'interface from file %s: %s' % (parsed_args.json, e))
+
+ if parsed_args.config_json:
+ blob = osc_utils.read_blob_file_contents(parsed_args.configs)
+ try:
+ job_configs['configs'] = jsonutils.loads(blob)
+ except ValueError as e:
+ raise exceptions.CommandError(
+ 'An error occurred when reading '
+ 'configs from file %s: %s' % (parsed_args.json, e))
+ elif parsed_args.configs:
+ job_configs['configs'] = dict(
+ map(lambda x: x.split(':', 1), parsed_args.configs))
+
+ if parsed_args.args:
+ job_configs['args'] = parsed_args.args
+
+ if parsed_args.params:
+ job_configs['params'] = dict(
+ map(lambda x: x.split(':', 1), parsed_args.params))
+
+ jt_id = utils.get_resource(
+ client.jobs, parsed_args.job_template).id
+ cluster_id = utils.get_resource(
+ client.clusters, parsed_args.cluster).id
+
+ input_id = utils.get_resource(
+ client.data_sources, parsed_args.input).id
+ output_id = utils.get_resource(
+ client.data_sources, parsed_args.output).id
+
+ data = client.job_executions.create(
+ job_id=jt_id, cluster_id=cluster_id, input_id=input_id,
+ output_id=output_id, interface=parsed_args.interface,
+ configs=job_configs, is_public=parsed_args.public,
+ is_protected=parsed_args.protected).to_dict()
+
+ _format_job_output(data)
+ data = utils.prepare_data(data, JOB_FIELDS)
+
+ return self.dict2columns(data)
+
+
+class ListJobs(lister.Lister):
+ """Lists jobs"""
+
+ log = logging.getLogger(__name__ + ".ListJobs")
+
+ def get_parser(self, prog_name):
+ parser = super(ListJobs, self).get_parser(prog_name)
+ parser.add_argument(
+ '--long',
+ action='store_true',
+ default=False,
+ help='List additional fields in output',
+ )
+ parser.add_argument(
+ '--status',
+ metavar="<status>",
+ choices=JOB_STATUS_CHOICES,
+ help="List jobs with specific status"
+ )
+
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug("take_action(%s)" % parsed_args)
+ client = self.app.client_manager.data_processing
+
+ data = client.job_executions.list()
+ for job in data:
+ job.status = job.info['status']
+
+ if parsed_args.status:
+ data = [job for job in data
+ if job.info['status'] == parsed_args.status.replace(
+ '-', '').upper()]
+
+ if parsed_args.long:
+ columns = ('id', 'cluster id', 'job id', 'status', 'start time',
+ 'end time')
+ column_headers = utils.prepare_column_headers(columns)
+
+ else:
+ columns = ('id', 'cluster id', 'job id', 'status')
+ column_headers = utils.prepare_column_headers(columns)
+
+ return (
+ column_headers,
+ (osc_utils.get_item_properties(
+ s,
+ columns
+ ) for s in data)
+ )
+
+
+class ShowJob(show.ShowOne):
+ """Display job details"""
+
+ log = logging.getLogger(__name__ + ".ShowJob")
+
+ def get_parser(self, prog_name):
+ parser = super(ShowJob, self).get_parser(prog_name)
+ parser.add_argument(
+ "job",
+ metavar="<job>",
+ help="ID of the job to display",
+ )
+
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug("take_action(%s)" % parsed_args)
+ client = self.app.client_manager.data_processing
+
+ data = client.job_executions.get(parsed_args.job).to_dict()
+
+ _format_job_output(data)
+ data = utils.prepare_data(data, JOB_FIELDS)
+
+ return self.dict2columns(data)
+
+
+class DeleteJob(command.Command):
+ """Deletes job"""
+
+ log = logging.getLogger(__name__ + ".DeleteJob")
+
+ def get_parser(self, prog_name):
+ parser = super(DeleteJob, self).get_parser(prog_name)
+ parser.add_argument(
+ "job",
+ metavar="<job>",
+ nargs="+",
+ help="ID(s) of the job(s) to delete",
+ )
+ parser.add_argument(
+ '--wait',
+ action='store_true',
+ default=False,
+ help='Wait for the job(s) delete to complete',
+ )
+
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug("take_action(%s)" % parsed_args)
+ client = self.app.client_manager.data_processing
+ for job_id in parsed_args.job:
+ client.job_executions.delete(job_id)
+
+ if parsed_args.wait:
+ for job_id in parsed_args.job:
+ if not utils.wait_for_delete(client.job_executions, job_id):
+ self.log.error(
+ 'Error occurred during job deleting: %s',
+ job_id)
+
+
+class UpdateJob(show.ShowOne):
+ """Updates job"""
+
+ log = logging.getLogger(__name__ + ".UpdateJob")
+
+ def get_parser(self, prog_name):
+ parser = super(UpdateJob, self).get_parser(prog_name)
+
+ parser.add_argument(
+ 'job',
+ metavar="<job>",
+ help="ID of the job to update",
+ )
+ public = parser.add_mutually_exclusive_group()
+ public.add_argument(
+ '--public',
+ action='store_true',
+ help='Make the job public (Visible from other tenants)',
+ dest='is_public'
+ )
+ public.add_argument(
+ '--private',
+ action='store_false',
+ help='Make the job private (Visible only from this tenant)',
+ dest='is_public'
+ )
+ protected = parser.add_mutually_exclusive_group()
+ protected.add_argument(
+ '--protected',
+ action='store_true',
+ help='Make the job protected',
+ dest='is_protected'
+ )
+ protected.add_argument(
+ '--unprotected',
+ action='store_false',
+ help='Make the job unprotected',
+ dest='is_protected'
+ )
+
+ parser.set_defaults(is_public=None, is_protected=None)
+
+ return parser
+
+ def take_action(self, parsed_args):
+ self.log.debug("take_action(%s)" % parsed_args)
+ client = self.app.client_manager.data_processing
+
+ data = client.job_executions.update(
+ parsed_args.job,
+ is_public=parsed_args.is_public,
+ is_protected=parsed_args.is_protected
+ ).job_execution
+
+ _format_job_output(data)
+ data = utils.prepare_data(data, JOB_FIELDS)
+
+ return self.dict2columns(data)
diff --git a/saharaclient/tests/unit/osc/v1/test_jobs.py b/saharaclient/tests/unit/osc/v1/test_jobs.py
new file mode 100644
index 0000000..f907a8e
--- /dev/null
+++ b/saharaclient/tests/unit/osc/v1/test_jobs.py
@@ -0,0 +1,323 @@
+# Copyright (c) 2015 Mirantis Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+
+from openstackclient.tests import utils as osc_utils
+
+from saharaclient.api import job_executions as api_je
+from saharaclient.osc.v1 import jobs as osc_je
+from saharaclient.tests.unit.osc.v1 import fakes
+
+JOB_EXECUTION_INFO = {
+ "is_public": False,
+ "id": "je_id",
+ "interface": [],
+ "is_protected": False,
+ "input_id": 'input_id',
+ "output_id": 'output_id',
+ "job_id": "job_id",
+ "cluster_id": 'cluster_id',
+ "start_time": "start",
+ "end_time": "end",
+ "engine_job_id": "engine_job_id",
+ "info": {
+ "status": 'SUCCEEDED'
+ },
+ "job_configs": {
+ "configs": {
+ "config1": "1",
+ "config2": "2"
+ },
+ "args": [
+ "arg1",
+ "arg2"
+ ],
+ "params": {
+ "param2": "value2",
+ "param1": "value1"
+ }
+ }
+}
+
+
+class TestJobs(fakes.TestDataProcessing):
+ def setUp(self):
+ super(TestJobs, self).setUp()
+ self.je_mock = self.app.client_manager.data_processing.job_executions
+ self.je_mock.reset_mock()
+
+
+class TestExecuteJob(TestJobs):
+ # TODO(apavlov): check for execution with --interface, --configs, --json
+ def setUp(self):
+ super(TestExecuteJob, self).setUp()
+ self.je_mock.create.return_value = api_je.JobExecution(
+ None, JOB_EXECUTION_INFO)
+ self.ds_mock = self.app.client_manager.data_processing.data_sources
+ self.ds_mock.find_unique.return_value = mock.Mock(id='ds_id')
+ self.c_mock = self.app.client_manager.data_processing.clusters
+ self.c_mock.find_unique.return_value = mock.Mock(id='cluster_id')
+ self.jt_mock = self.app.client_manager.data_processing.jobs
+ self.jt_mock.find_unique.return_value = mock.Mock(id='job_id')
+ self.ds_mock.reset_mock()
+ self.c_mock.reset_mock()
+ self.jt_mock.reset_mock()
+
+ # Command to test
+ self.cmd = osc_je.ExecuteJob(self.app, None)
+
+ def test_job_execute_minimum_options(self):
+ arglist = ['--job-template', 'job-template', '--cluster', 'cluster']
+ verifylist = [('job_template', 'job-template'), ('cluster', 'cluster')]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.create.assert_called_once_with(
+ cluster_id='cluster_id', configs={}, input_id='ds_id',
+ interface=None, is_protected=False, is_public=False,
+ job_id='job_id', output_id='ds_id')
+
+ def test_job_execute_all_options(self):
+ arglist = ['--job-template', 'job-template', '--cluster', 'cluster',
+ '--input', 'input', '--output', 'output', '--params',
+ 'param1:value1', 'param2:value2', '--args', 'arg1', 'arg2',
+ '--configs', 'config1:1', 'config2:2', '--public',
+ '--protected']
+
+ verifylist = [('job_template', 'job-template'), ('cluster', 'cluster'),
+ ('input', 'input'), ('output', 'output'),
+ ('params', ['param1:value1', 'param2:value2']),
+ ('args', ['arg1', 'arg2']),
+ ('configs', ['config1:1', 'config2:2']),
+ ('public', True),
+ ('protected', True)]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.create.assert_called_once_with(
+ cluster_id='cluster_id',
+ configs={'configs': {'config1': '1', 'config2': '2'},
+ 'args': ['arg1', 'arg2'],
+ 'params': {'param2': 'value2', 'param1': 'value1'}},
+ input_id='ds_id', interface=None, is_protected=True,
+ is_public=True, job_id='job_id', output_id='ds_id')
+
+ # Check that columns are correct
+ expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
+ 'Input id', 'Is protected', 'Is public',
+ 'Job template id', 'Output id', 'Start time',
+ 'Status')
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
+ 'input_id', False, False, 'job_id', 'output_id',
+ 'start', 'SUCCEEDED')
+ self.assertEqual(expected_data, data)
+
+
+class TestListJobs(TestJobs):
+ def setUp(self):
+ super(TestListJobs, self).setUp()
+ self.je_mock.list.return_value = [api_je.JobExecution(
+ None, JOB_EXECUTION_INFO)]
+
+ # Command to test
+ self.cmd = osc_je.ListJobs(self.app, None)
+
+ def test_jobs_list_no_options(self):
+ arglist = []
+ verifylist = []
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that columns are correct
+ expected_columns = ['Id', 'Cluster id', 'Job id', 'Status']
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')]
+ self.assertEqual(expected_data, list(data))
+
+ def test_jobs_list_long(self):
+ arglist = ['--long']
+ verifylist = [('long', True)]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that columns are correct
+ expected_columns = ['Id', 'Cluster id', 'Job id', 'Status',
+ 'Start time', 'End time']
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED',
+ 'start', 'end')]
+ self.assertEqual(expected_data, list(data))
+
+ def test_jobs_list_extra_search_opts(self):
+ arglist = ['--status', 'succeeded']
+ verifylist = [('status', 'succeeded')]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that columns are correct
+ expected_columns = ['Id', 'Cluster id', 'Job id', 'Status']
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = [('je_id', 'cluster_id', 'job_id', 'SUCCEEDED')]
+ self.assertEqual(expected_data, list(data))
+
+
+class TestShowJob(TestJobs):
+ def setUp(self):
+ super(TestShowJob, self).setUp()
+ self.je_mock.get.return_value = api_je.JobExecution(
+ None, JOB_EXECUTION_INFO)
+
+ # Command to test
+ self.cmd = osc_je.ShowJob(self.app, None)
+
+ def test_job_show(self):
+ arglist = ['job_id']
+ verifylist = [('job', 'job_id')]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.get.assert_called_once_with('job_id')
+
+ # Check that columns are correct
+ expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
+ 'Input id', 'Is protected', 'Is public',
+ 'Job template id', 'Output id', 'Start time',
+ 'Status')
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
+ 'input_id', False, False, 'job_id', 'output_id',
+ 'start', 'SUCCEEDED')
+ self.assertEqual(expected_data, data)
+
+
+class TestDeleteJob(TestJobs):
+ def setUp(self):
+ super(TestDeleteJob, self).setUp()
+ self.je_mock.get.return_value = api_je.JobExecution(
+ None, JOB_EXECUTION_INFO)
+
+ # Command to test
+ self.cmd = osc_je.DeleteJob(self.app, None)
+
+ def test_job_delete(self):
+ arglist = ['job_id']
+ verifylist = [('job', ['job_id'])]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.delete.assert_called_once_with('job_id')
+
+
+class TestUpdateJob(TestJobs):
+ def setUp(self):
+ super(TestUpdateJob, self).setUp()
+ self.je_mock.get.return_value = api_je.JobExecution(
+ None, JOB_EXECUTION_INFO)
+ self.je_mock.update.return_value = mock.Mock(
+ job_execution=JOB_EXECUTION_INFO.copy())
+
+ # Command to test
+ self.cmd = osc_je.UpdateJob(self.app, None)
+
+ def test_job_update_no_options(self):
+ arglist = []
+ verifylist = []
+
+ self.assertRaises(osc_utils.ParserException, self.check_parser,
+ self.cmd, arglist, verifylist)
+
+ def test_job_update_nothing_updated(self):
+ arglist = ['job_id']
+
+ verifylist = [('job', 'job_id')]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.update.assert_called_once_with(
+ 'job_id', is_protected=None, is_public=None)
+
+ def test_job_update_public_protected(self):
+ arglist = ['job_id', '--public', '--protected']
+
+ verifylist = [('job', 'job_id'), ('is_public', True),
+ ('is_protected', True)]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ columns, data = self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.update.assert_called_once_with(
+ 'job_id', is_protected=True, is_public=True)
+
+ # Check that columns are correct
+ expected_columns = ('Cluster id', 'End time', 'Engine job id', 'Id',
+ 'Input id', 'Is protected', 'Is public',
+ 'Job template id', 'Output id', 'Start time',
+ 'Status')
+ self.assertEqual(expected_columns, columns)
+
+ # Check that data is correct
+ expected_data = ('cluster_id', 'end', 'engine_job_id', 'je_id',
+ 'input_id', False, False, 'job_id', 'output_id',
+ 'start', 'SUCCEEDED')
+ self.assertEqual(expected_data, data)
+
+ def test_job_update_private_unprotected(self):
+ arglist = ['job_id', '--private', '--unprotected']
+
+ verifylist = [('job', 'job_id'), ('is_public', False),
+ ('is_protected', False)]
+
+ parsed_args = self.check_parser(self.cmd, arglist, verifylist)
+
+ self.cmd.take_action(parsed_args)
+
+ # Check that correct arguments were passed
+ self.je_mock.update.assert_called_once_with(
+ 'job_id', is_protected=False, is_public=False)
diff --git a/setup.cfg b/setup.cfg
index 2862ad1..a9ac163 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -82,6 +82,12 @@ openstack.data_processing.v1 =
dataprocessing_job_type_list = saharaclient.osc.v1.job_types:ListJobTypes
dataprocessing_job_type_configs_get = saharaclient.osc.v1.job_types:GetJobTypeConfigs
+ dataprocessing_job_execute = saharaclient.osc.v1.jobs:ExecuteJob
+ dataprocessing_job_list = saharaclient.osc.v1.jobs:ListJobs
+ dataprocessing_job_show = saharaclient.osc.v1.jobs:ShowJob
+ dataprocessing_job_update = saharaclient.osc.v1.jobs:UpdateJob
+ dataprocessing_job_delete = saharaclient.osc.v1.jobs:DeleteJob
+
[build_sphinx]
all_files = 1
build-dir = doc/build