diff options
author | Aleksandr Mishchenko <AMishchenko@luxoft.com> | 2020-01-15 19:37:22 +0200 |
---|---|---|
committer | Vladyslav Mustafin <vmustafin@luxoft.com> | 2020-02-19 18:45:58 +0200 |
commit | ce593107c5fda551012c5e5a4698c1ceaef7034c (patch) | |
tree | 5430e7250c898bd44f47dc63971fd6520bfdebd1 /generator/generator.py | |
parent | 9e389ad7668b8ac5d47873f26cf314459a670d15 (diff) | |
download | sdl_ios-ce593107c5fda551012c5e5a4698c1ceaef7034c.tar.gz |
SDL-0234 Proxy Library RPC Generation
Diffstat (limited to 'generator/generator.py')
-rw-r--r-- | generator/generator.py | 479 |
1 files changed, 479 insertions, 0 deletions
diff --git a/generator/generator.py b/generator/generator.py new file mode 100644 index 000000000..1b2a06aa4 --- /dev/null +++ b/generator/generator.py @@ -0,0 +1,479 @@ +""" +Generator +""" +import asyncio +import json +import logging +import re +import sys +from argparse import ArgumentParser +from collections import namedtuple, OrderedDict +from datetime import datetime, date +from inspect import getfile +from json import JSONDecodeError +from os.path import basename, join +from re import findall + +from jinja2 import UndefinedError, TemplateNotFound, FileSystemLoader, Environment, ChoiceLoader, \ + TemplateAssertionError, TemplateSyntaxError, TemplateRuntimeError +from pathlib2 import Path + +ROOT = Path(__file__).absolute().parents[0] + +sys.path.append(ROOT.joinpath('rpc_spec/InterfaceParser').as_posix()) + +try: + from parsers.rpc_base import ParseError + from parsers.sdl_rpc_v2 import Parser + from model.interface import Interface + from transformers.common_producer import InterfaceProducerCommon as Common + from transformers.enums_producer import EnumsProducer + from transformers.functions_producer import FunctionsProducer + from transformers.structs_producer import StructsProducer +except ImportError as error: + print('%s.\nprobably you did not initialize submodule', error) + ParseError = Parser = Interface = Common = EnumsProducer = FunctionsProducer = StructsProducer = None + sys.exit(1) + + +class Generator: + """ + This class contains only technical features, as follow: + - parsing command-line arguments, or evaluating required container interactively; + - calling parsers to get Model from xml; + - calling producers to transform initial Model to dict used in jinja2 templates + Not required to be covered by unit tests cause contains only technical features. + """ + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self._env = None + self._output_directory = None + self.loop = asyncio.get_event_loop() + self.paths_named = namedtuple('paths_named', 'enum_class struct_class request_class response_class ' + 'notification_class function_names parameter_names') + + @property + def output_directory(self) -> Path: + """ + + :return: + """ + return self._output_directory + + @output_directory.setter + def output_directory(self, output_directory): + """ + + :param output_directory: + :return: + """ + if output_directory.startswith('/'): + path = Path(output_directory).absolute().resolve() + else: + path = ROOT.joinpath(output_directory).resolve() + if not path.exists(): + self.logger.warning('Directory not found: %s, trying to create it', path) + try: + path.mkdir(parents=True, exist_ok=True) + except OSError as message1: + self.logger.critical('Failed to create directory %s, %s', path.as_posix(), message1) + sys.exit(1) + self._output_directory = path + + @property + def env(self) -> Environment: + """ + + :return: + """ + return self._env + + @env.setter + def env(self, paths): + """ + + :param paths: + :return: + """ + loaders = list(filter(lambda l: Path(l).exists(), paths)) + if not loaders: + self.logger.error('Directory with templates not found %s', str(paths)) + sys.exit(1) + loaders = [FileSystemLoader(l) for l in loaders] + + self._env = Environment(loader=ChoiceLoader(loaders)) + self._env.filters['title'] = self.title + self._env.globals['year'] = date.today().year + + @staticmethod + def title(name): + """ + + :param name: + :return: + """ + return name[:1].upper() + name[1:] + + @property + def get_version(self): + """ + + :return: + """ + return Common.version + + def config_logging(self, verbose): + """ + + :param verbose: + :return: + """ + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(fmt='%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s', + datefmt='%H:%M:%S')) + root_logger = logging.getLogger() + + if verbose: + handler.setLevel(logging.DEBUG) + self.logger.setLevel(logging.DEBUG) + root_logger.setLevel(logging.DEBUG) + else: + handler.setLevel(logging.ERROR) + self.logger.setLevel(logging.ERROR) + root_logger.setLevel(logging.ERROR) + logging.getLogger().handlers.clear() + root_logger.addHandler(handler) + + def get_parser(self): + """ + Parsing command-line arguments, or evaluating required container interactively. + :return: an instance of argparse.ArgumentParser + """ + if len(sys.argv) == 2 and sys.argv[1] in ('-v', '--version'): + print(self.get_version) + sys.exit(0) + + container = namedtuple('container', 'name path') + xml = container('source_xml', ROOT.joinpath('rpc_spec/MOBILE_API.xml')) + required_source = not xml.path.exists() + + out = container('output_directory', ROOT.parents[0].joinpath('SmartDeviceLink')) + output_required = not out.path.exists() + + parser = ArgumentParser(description='Proxy Library RPC Generator') + parser.add_argument('-v', '--version', action='store_true', help='print the version and sys.exit') + parser.add_argument('-xml', '--source-xml', '--input-file', required=required_source, + help='should point to MOBILE_API.xml') + parser.add_argument('-xsd', '--source-xsd', required=False) + parser.add_argument('-d', '--output-directory', required=output_required, + help='define the place where the generated output should be placed') + parser.add_argument('-t', '--templates-directory', nargs='?', default=ROOT.joinpath('templates').as_posix(), + help='path to directory with templates') + parser.add_argument('-r', '--regex-pattern', required=False, + help='only elements matched with defined regex pattern will be parsed and generated') + parser.add_argument('--verbose', action='store_true', help='display additional details like logs etc') + parser.add_argument('-e', '--enums', required=False, action='store_true', + help='only specified elements will be generated, if present') + parser.add_argument('-s', '--structs', required=False, action='store_true', + help='only specified elements will be generated, if present') + parser.add_argument('-m', '-f', '--functions', required=False, action='store_true', + help='only specified elements will be generated, if present') + parser.add_argument('-y', '--overwrite', action='store_true', + help='force overwriting of existing files in output directory, ignore confirmation message') + parser.add_argument('-n', '--skip', action='store_true', + help='skip overwriting of existing files in output directory, ignore confirmation message') + + args, unknown = parser.parse_known_args() + + if unknown: + print('found unknown arguments: ' + ' '.join(unknown)) + parser.print_help(sys.stderr) + sys.exit(1) + + if args.skip and args.overwrite: + print('please select only one option skip "-n" or overwrite "-y"') + sys.exit(1) + + if not args.enums and not args.structs and not args.functions: + args.enums = args.structs = args.functions = True + + for kind in (xml, out): + if not getattr(args, kind.name) and kind.path.exists(): + while True: + try: + confirm = input('Confirm default path {} for {} Y/Enter = yes, N = no' + .format(kind.path, kind.name)) + if confirm.lower() == 'y' or not confirm: + print('{} set to {}'.format(kind.name, kind.path)) + setattr(args, kind.name, kind.path.as_posix()) + break + if confirm.lower() == 'n': + print('provide argument ' + kind.name) + sys.exit(1) + except KeyboardInterrupt: + print('\nThe user interrupted the execution of the program') + sys.exit(1) + + self.logger.debug('parsed arguments:\n%s', vars(args)) + + return args + + def versions_compatibility_validating(self): + """version of generator script requires the same or lesser version of parser script. + if the parser script needs to fix a bug (and becomes, e.g. 1.0.1) and the generator script stays at 1.0.0. + As long as the generator script is the same or greater major version, it should be parsable. + This requires some level of backward compatibility. E.g. they have to be the same major version. + + """ + + regex = r'(\d+\.\d+).(\d)' + + parser_origin = Parser().get_version + generator_origin = self.get_version + parser_split = findall(regex, parser_origin).pop() + generator_split = findall(regex, generator_origin).pop() + + parser_major = float(parser_split[0]) + generator_major = float(generator_split[0]) + + if parser_major > generator_major: + self.logger.critical('Generator (%s) requires the same or lesser version of Parser (%s)', + generator_origin, parser_origin) + sys.exit(1) + + self.logger.info('Parser type: %s, version %s,\tGenerator version %s', + basename(getfile(Parser().__class__)), parser_origin, generator_origin) + + async def get_paths(self, file_name=ROOT.joinpath('paths.ini')): + """ + :param file_name: path to file with container + :return: namedtuple with container to key elements + """ + data = OrderedDict() + try: + with file_name.open('r') as file: + for line in file: + if line.startswith('#'): + self.logger.warning('commented property %s, which will be skipped', line.strip()) + continue + if re.match(r'^(\w+)\s?=\s?(.+)', line): + if len(line.split('=')) > 2: + self.logger.critical('can not evaluate value, too many separators %s', str(line)) + sys.exit(1) + name, var = line.partition('=')[::2] + if name.strip() in data: + self.logger.critical('duplicate key %s', name) + sys.exit(1) + data[name.strip().lower()] = var.strip() + except FileNotFoundError as message1: + self.logger.critical(message1) + sys.exit(1) + + missed = list(set(self.paths_named._fields) - set(data.keys())) + if missed: + self.logger.critical('in %s missed fields: %s ', file, str(missed)) + sys.exit(1) + + return self.paths_named(**data) + + async def get_mappings(self, file=ROOT.joinpath('mapping.json')): + """ + The key name in *.json is equal to property named in jinja2 templates + :param file: path to file with manual mappings + :return: dictionary with custom manual mappings + """ + try: + with file.open('r') as handler: + content = handler.readlines() + return json.loads(''.join(content)) + except (FileNotFoundError, JSONDecodeError) as error1: + self.logger.error('Failure to get mappings %s', error1) + return OrderedDict() + + def write_file(self, file, templates, data): + """ + Calling producer/transformer instance to transform initial Model to dict used in jinja2 templates. + Applying transformed dict to jinja2 templates and writing to appropriate file + :param file: output js file + :param templates: name of template + :param data: an instance of transformer for particular item + """ + try: + render = self.env.get_or_select_template(templates).render(data) + with file.open('w', encoding='utf-8') as handler: + handler.write(render) + except (TemplateNotFound, UndefinedError, TemplateAssertionError, TemplateSyntaxError, TemplateRuntimeError) \ + as error1: + self.logger.error('skipping %s, template not found %s', file.as_posix(), error1) + + async def process_main(self, skip, overwrite, items, transformer): + """ + Process each item from initial Model. According to provided arguments skipping, overriding or asking what to to. + :param skip: if file exist skip it + :param overwrite: if file exist overwrite it + :param items: elements initial Model + :param transformer: producer/transformer instance + """ + tasks = [] + for item in items.values(): + render = transformer.transform(item) + file = self.output_directory.joinpath(render.get('name', item.name)) + for extension in ('.h', '.m'): + data = render.copy() + data['imports'] = data['imports'][extension] + file_with_suffix = file.with_suffix(extension) + templates = ['{}s/template{}'.format(type(item).__name__.lower(), extension)] + if 'template' in data: + templates.insert(0, data['template'] + extension) + tasks.append(self.process_common(skip, overwrite, file_with_suffix, data, templates)) + + await asyncio.gather(*tasks) + + async def process_function_name(self, skip, overwrite, functions, structs, transformer): + """ + + :param skip: + :param overwrite: + :param functions: + :param structs: + :param transformer: + :return: + """ + tasks = [] + for name in [transformer.function_names, transformer.parameter_names]: + file = self.output_directory.joinpath(name) + if name == transformer.function_names: + data = transformer.get_function_names(functions) + elif name == transformer.parameter_names: + data = transformer.get_simple_params(functions, structs) + else: + self.logger.error('No "data" for %s', name) + continue + for extension in ('.h', '.m'): + templates = [name + extension] + file_with_suffix = file.with_suffix(extension) + tasks.append(self.process_common(skip, overwrite, file_with_suffix, data, templates)) + + await asyncio.gather(*tasks) + + async def process_common(self, skip, overwrite, file_with_suffix, data, templates): + """ + + :param skip: + :param overwrite: + :param file_with_suffix: + :param data: + :param templates: + :return: + """ + if file_with_suffix.is_file(): + if skip: + self.logger.info('Skipping %s', file_with_suffix.name) + return + if overwrite: + self.logger.info('Overriding %s', file_with_suffix.name) + self.write_file(file_with_suffix, templates, data) + else: + while True: + try: + confirm = input('File already exists {}. Overwrite? Y/Enter = yes, N = no\n' + .format(file_with_suffix.name)) + if confirm.lower() == 'y' or not confirm: + self.logger.info('Overriding %s', file_with_suffix.name) + self.write_file(file_with_suffix, templates, data) + break + if confirm.lower() == 'n': + self.logger.info('Skipping %s', file_with_suffix.name) + break + except KeyboardInterrupt: + print('\nThe user interrupted the execution of the program') + sys.exit(1) + else: + self.logger.info('Writing new %s', file_with_suffix.name) + self.write_file(file_with_suffix, templates, data) + + @staticmethod + def filter_pattern(interface, pattern): + """ + + :param interface: + :param pattern: + :return: + """ + names = tuple(interface.enums.keys()) + tuple(interface.structs.keys()) + + if pattern: + match = {key: OrderedDict() for key in vars(interface).keys()} + match['params'] = interface.params + for key, value in vars(interface).items(): + if key == 'params': + continue + match[key].update({name: item for name, item in value.items() if re.match(pattern, item.name)}) + return Interface(**match), names + return interface, names + + async def parser(self, source_xml, source_xsd): + """ + + :param source_xml: + :param source_xsd: + :return: + """ + try: + start = datetime.now() + model = self.loop.run_in_executor(None, Parser().parse, source_xml, source_xsd) + model = await model + self.logger.debug('finish getting model in %s milisec,', (datetime.now() - start).microseconds / 1000.0) + return model + except ParseError as error1: + self.logger.error(error1) + sys.exit(1) + + async def get_all_async(self, source_xml, source_xsd): + """ + + :param source_xml: + :param source_xsd: + :return: + """ + return await asyncio.gather(self.parser(source_xml, source_xsd), self.get_paths(), self.get_mappings()) + + def main(self): + """ + Entry point for parser and generator + :return: None + """ + args = self.get_parser() + self.config_logging(args.verbose) + self.versions_compatibility_validating() + self.output_directory = args.output_directory + + interface, paths, mappings = self.loop.run_until_complete(self.get_all_async(args.source_xml, args.source_xsd)) + + self.env = [args.templates_directory] + [join(args.templates_directory, k) for k in vars(interface).keys()] + + filtered, names = self.filter_pattern(interface, args.regex_pattern) + + tasks = [] + functions_transformer = FunctionsProducer(paths, names, mappings) + if args.enums and filtered.enums: + tasks.append(self.process_main(args.skip, args.overwrite, filtered.enums, + EnumsProducer(paths.enum_class, mappings))) + if args.structs and filtered.structs: + tasks.append(self.process_main(args.skip, args.overwrite, filtered.structs, + StructsProducer(paths.struct_class, names, mappings))) + if args.functions and filtered.functions: + tasks.append(self.process_main(args.skip, args.overwrite, filtered.functions, functions_transformer)) + tasks.append(self.process_function_name(args.skip, args.overwrite, interface.functions, + interface.structs, functions_transformer)) + if tasks: + self.loop.run_until_complete(asyncio.wait(tasks)) + else: + self.logger.warning('Nothing matched with "%s"', args.regex_pattern) + + self.loop.close() + + +if __name__ == '__main__': + Generator().main() |