summaryrefslogtreecommitdiff
path: root/generator/generator.py
diff options
context:
space:
mode:
authorAleksandr Mishchenko <AMishchenko@luxoft.com>2020-01-15 19:37:22 +0200
committerVladyslav Mustafin <vmustafin@luxoft.com>2020-02-19 18:45:58 +0200
commitce593107c5fda551012c5e5a4698c1ceaef7034c (patch)
tree5430e7250c898bd44f47dc63971fd6520bfdebd1 /generator/generator.py
parent9e389ad7668b8ac5d47873f26cf314459a670d15 (diff)
downloadsdl_ios-ce593107c5fda551012c5e5a4698c1ceaef7034c.tar.gz
SDL-0234 Proxy Library RPC Generation
Diffstat (limited to 'generator/generator.py')
-rw-r--r--generator/generator.py479
1 files changed, 479 insertions, 0 deletions
diff --git a/generator/generator.py b/generator/generator.py
new file mode 100644
index 000000000..1b2a06aa4
--- /dev/null
+++ b/generator/generator.py
@@ -0,0 +1,479 @@
+"""
+Generator
+"""
+import asyncio
+import json
+import logging
+import re
+import sys
+from argparse import ArgumentParser
+from collections import namedtuple, OrderedDict
+from datetime import datetime, date
+from inspect import getfile
+from json import JSONDecodeError
+from os.path import basename, join
+from re import findall
+
+from jinja2 import UndefinedError, TemplateNotFound, FileSystemLoader, Environment, ChoiceLoader, \
+ TemplateAssertionError, TemplateSyntaxError, TemplateRuntimeError
+from pathlib2 import Path
+
+ROOT = Path(__file__).absolute().parents[0]
+
+sys.path.append(ROOT.joinpath('rpc_spec/InterfaceParser').as_posix())
+
+try:
+ from parsers.rpc_base import ParseError
+ from parsers.sdl_rpc_v2 import Parser
+ from model.interface import Interface
+ from transformers.common_producer import InterfaceProducerCommon as Common
+ from transformers.enums_producer import EnumsProducer
+ from transformers.functions_producer import FunctionsProducer
+ from transformers.structs_producer import StructsProducer
+except ImportError as error:
+ print('%s.\nprobably you did not initialize submodule', error)
+ ParseError = Parser = Interface = Common = EnumsProducer = FunctionsProducer = StructsProducer = None
+ sys.exit(1)
+
+
+class Generator:
+ """
+ This class contains only technical features, as follow:
+ - parsing command-line arguments, or evaluating required container interactively;
+ - calling parsers to get Model from xml;
+ - calling producers to transform initial Model to dict used in jinja2 templates
+ Not required to be covered by unit tests cause contains only technical features.
+ """
+
+ def __init__(self):
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self._env = None
+ self._output_directory = None
+ self.loop = asyncio.get_event_loop()
+ self.paths_named = namedtuple('paths_named', 'enum_class struct_class request_class response_class '
+ 'notification_class function_names parameter_names')
+
+ @property
+ def output_directory(self) -> Path:
+ """
+
+ :return:
+ """
+ return self._output_directory
+
+ @output_directory.setter
+ def output_directory(self, output_directory):
+ """
+
+ :param output_directory:
+ :return:
+ """
+ if output_directory.startswith('/'):
+ path = Path(output_directory).absolute().resolve()
+ else:
+ path = ROOT.joinpath(output_directory).resolve()
+ if not path.exists():
+ self.logger.warning('Directory not found: %s, trying to create it', path)
+ try:
+ path.mkdir(parents=True, exist_ok=True)
+ except OSError as message1:
+ self.logger.critical('Failed to create directory %s, %s', path.as_posix(), message1)
+ sys.exit(1)
+ self._output_directory = path
+
+ @property
+ def env(self) -> Environment:
+ """
+
+ :return:
+ """
+ return self._env
+
+ @env.setter
+ def env(self, paths):
+ """
+
+ :param paths:
+ :return:
+ """
+ loaders = list(filter(lambda l: Path(l).exists(), paths))
+ if not loaders:
+ self.logger.error('Directory with templates not found %s', str(paths))
+ sys.exit(1)
+ loaders = [FileSystemLoader(l) for l in loaders]
+
+ self._env = Environment(loader=ChoiceLoader(loaders))
+ self._env.filters['title'] = self.title
+ self._env.globals['year'] = date.today().year
+
+ @staticmethod
+ def title(name):
+ """
+
+ :param name:
+ :return:
+ """
+ return name[:1].upper() + name[1:]
+
+ @property
+ def get_version(self):
+ """
+
+ :return:
+ """
+ return Common.version
+
+ def config_logging(self, verbose):
+ """
+
+ :param verbose:
+ :return:
+ """
+ handler = logging.StreamHandler()
+ handler.setFormatter(logging.Formatter(fmt='%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s',
+ datefmt='%H:%M:%S'))
+ root_logger = logging.getLogger()
+
+ if verbose:
+ handler.setLevel(logging.DEBUG)
+ self.logger.setLevel(logging.DEBUG)
+ root_logger.setLevel(logging.DEBUG)
+ else:
+ handler.setLevel(logging.ERROR)
+ self.logger.setLevel(logging.ERROR)
+ root_logger.setLevel(logging.ERROR)
+ logging.getLogger().handlers.clear()
+ root_logger.addHandler(handler)
+
+ def get_parser(self):
+ """
+ Parsing command-line arguments, or evaluating required container interactively.
+ :return: an instance of argparse.ArgumentParser
+ """
+ if len(sys.argv) == 2 and sys.argv[1] in ('-v', '--version'):
+ print(self.get_version)
+ sys.exit(0)
+
+ container = namedtuple('container', 'name path')
+ xml = container('source_xml', ROOT.joinpath('rpc_spec/MOBILE_API.xml'))
+ required_source = not xml.path.exists()
+
+ out = container('output_directory', ROOT.parents[0].joinpath('SmartDeviceLink'))
+ output_required = not out.path.exists()
+
+ parser = ArgumentParser(description='Proxy Library RPC Generator')
+ parser.add_argument('-v', '--version', action='store_true', help='print the version and sys.exit')
+ parser.add_argument('-xml', '--source-xml', '--input-file', required=required_source,
+ help='should point to MOBILE_API.xml')
+ parser.add_argument('-xsd', '--source-xsd', required=False)
+ parser.add_argument('-d', '--output-directory', required=output_required,
+ help='define the place where the generated output should be placed')
+ parser.add_argument('-t', '--templates-directory', nargs='?', default=ROOT.joinpath('templates').as_posix(),
+ help='path to directory with templates')
+ parser.add_argument('-r', '--regex-pattern', required=False,
+ help='only elements matched with defined regex pattern will be parsed and generated')
+ parser.add_argument('--verbose', action='store_true', help='display additional details like logs etc')
+ parser.add_argument('-e', '--enums', required=False, action='store_true',
+ help='only specified elements will be generated, if present')
+ parser.add_argument('-s', '--structs', required=False, action='store_true',
+ help='only specified elements will be generated, if present')
+ parser.add_argument('-m', '-f', '--functions', required=False, action='store_true',
+ help='only specified elements will be generated, if present')
+ parser.add_argument('-y', '--overwrite', action='store_true',
+ help='force overwriting of existing files in output directory, ignore confirmation message')
+ parser.add_argument('-n', '--skip', action='store_true',
+ help='skip overwriting of existing files in output directory, ignore confirmation message')
+
+ args, unknown = parser.parse_known_args()
+
+ if unknown:
+ print('found unknown arguments: ' + ' '.join(unknown))
+ parser.print_help(sys.stderr)
+ sys.exit(1)
+
+ if args.skip and args.overwrite:
+ print('please select only one option skip "-n" or overwrite "-y"')
+ sys.exit(1)
+
+ if not args.enums and not args.structs and not args.functions:
+ args.enums = args.structs = args.functions = True
+
+ for kind in (xml, out):
+ if not getattr(args, kind.name) and kind.path.exists():
+ while True:
+ try:
+ confirm = input('Confirm default path {} for {} Y/Enter = yes, N = no'
+ .format(kind.path, kind.name))
+ if confirm.lower() == 'y' or not confirm:
+ print('{} set to {}'.format(kind.name, kind.path))
+ setattr(args, kind.name, kind.path.as_posix())
+ break
+ if confirm.lower() == 'n':
+ print('provide argument ' + kind.name)
+ sys.exit(1)
+ except KeyboardInterrupt:
+ print('\nThe user interrupted the execution of the program')
+ sys.exit(1)
+
+ self.logger.debug('parsed arguments:\n%s', vars(args))
+
+ return args
+
+ def versions_compatibility_validating(self):
+ """version of generator script requires the same or lesser version of parser script.
+ if the parser script needs to fix a bug (and becomes, e.g. 1.0.1) and the generator script stays at 1.0.0.
+ As long as the generator script is the same or greater major version, it should be parsable.
+ This requires some level of backward compatibility. E.g. they have to be the same major version.
+
+ """
+
+ regex = r'(\d+\.\d+).(\d)'
+
+ parser_origin = Parser().get_version
+ generator_origin = self.get_version
+ parser_split = findall(regex, parser_origin).pop()
+ generator_split = findall(regex, generator_origin).pop()
+
+ parser_major = float(parser_split[0])
+ generator_major = float(generator_split[0])
+
+ if parser_major > generator_major:
+ self.logger.critical('Generator (%s) requires the same or lesser version of Parser (%s)',
+ generator_origin, parser_origin)
+ sys.exit(1)
+
+ self.logger.info('Parser type: %s, version %s,\tGenerator version %s',
+ basename(getfile(Parser().__class__)), parser_origin, generator_origin)
+
+ async def get_paths(self, file_name=ROOT.joinpath('paths.ini')):
+ """
+ :param file_name: path to file with container
+ :return: namedtuple with container to key elements
+ """
+ data = OrderedDict()
+ try:
+ with file_name.open('r') as file:
+ for line in file:
+ if line.startswith('#'):
+ self.logger.warning('commented property %s, which will be skipped', line.strip())
+ continue
+ if re.match(r'^(\w+)\s?=\s?(.+)', line):
+ if len(line.split('=')) > 2:
+ self.logger.critical('can not evaluate value, too many separators %s', str(line))
+ sys.exit(1)
+ name, var = line.partition('=')[::2]
+ if name.strip() in data:
+ self.logger.critical('duplicate key %s', name)
+ sys.exit(1)
+ data[name.strip().lower()] = var.strip()
+ except FileNotFoundError as message1:
+ self.logger.critical(message1)
+ sys.exit(1)
+
+ missed = list(set(self.paths_named._fields) - set(data.keys()))
+ if missed:
+ self.logger.critical('in %s missed fields: %s ', file, str(missed))
+ sys.exit(1)
+
+ return self.paths_named(**data)
+
+ async def get_mappings(self, file=ROOT.joinpath('mapping.json')):
+ """
+ The key name in *.json is equal to property named in jinja2 templates
+ :param file: path to file with manual mappings
+ :return: dictionary with custom manual mappings
+ """
+ try:
+ with file.open('r') as handler:
+ content = handler.readlines()
+ return json.loads(''.join(content))
+ except (FileNotFoundError, JSONDecodeError) as error1:
+ self.logger.error('Failure to get mappings %s', error1)
+ return OrderedDict()
+
+ def write_file(self, file, templates, data):
+ """
+ Calling producer/transformer instance to transform initial Model to dict used in jinja2 templates.
+ Applying transformed dict to jinja2 templates and writing to appropriate file
+ :param file: output js file
+ :param templates: name of template
+ :param data: an instance of transformer for particular item
+ """
+ try:
+ render = self.env.get_or_select_template(templates).render(data)
+ with file.open('w', encoding='utf-8') as handler:
+ handler.write(render)
+ except (TemplateNotFound, UndefinedError, TemplateAssertionError, TemplateSyntaxError, TemplateRuntimeError) \
+ as error1:
+ self.logger.error('skipping %s, template not found %s', file.as_posix(), error1)
+
+ async def process_main(self, skip, overwrite, items, transformer):
+ """
+ Process each item from initial Model. According to provided arguments skipping, overriding or asking what to to.
+ :param skip: if file exist skip it
+ :param overwrite: if file exist overwrite it
+ :param items: elements initial Model
+ :param transformer: producer/transformer instance
+ """
+ tasks = []
+ for item in items.values():
+ render = transformer.transform(item)
+ file = self.output_directory.joinpath(render.get('name', item.name))
+ for extension in ('.h', '.m'):
+ data = render.copy()
+ data['imports'] = data['imports'][extension]
+ file_with_suffix = file.with_suffix(extension)
+ templates = ['{}s/template{}'.format(type(item).__name__.lower(), extension)]
+ if 'template' in data:
+ templates.insert(0, data['template'] + extension)
+ tasks.append(self.process_common(skip, overwrite, file_with_suffix, data, templates))
+
+ await asyncio.gather(*tasks)
+
+ async def process_function_name(self, skip, overwrite, functions, structs, transformer):
+ """
+
+ :param skip:
+ :param overwrite:
+ :param functions:
+ :param structs:
+ :param transformer:
+ :return:
+ """
+ tasks = []
+ for name in [transformer.function_names, transformer.parameter_names]:
+ file = self.output_directory.joinpath(name)
+ if name == transformer.function_names:
+ data = transformer.get_function_names(functions)
+ elif name == transformer.parameter_names:
+ data = transformer.get_simple_params(functions, structs)
+ else:
+ self.logger.error('No "data" for %s', name)
+ continue
+ for extension in ('.h', '.m'):
+ templates = [name + extension]
+ file_with_suffix = file.with_suffix(extension)
+ tasks.append(self.process_common(skip, overwrite, file_with_suffix, data, templates))
+
+ await asyncio.gather(*tasks)
+
+ async def process_common(self, skip, overwrite, file_with_suffix, data, templates):
+ """
+
+ :param skip:
+ :param overwrite:
+ :param file_with_suffix:
+ :param data:
+ :param templates:
+ :return:
+ """
+ if file_with_suffix.is_file():
+ if skip:
+ self.logger.info('Skipping %s', file_with_suffix.name)
+ return
+ if overwrite:
+ self.logger.info('Overriding %s', file_with_suffix.name)
+ self.write_file(file_with_suffix, templates, data)
+ else:
+ while True:
+ try:
+ confirm = input('File already exists {}. Overwrite? Y/Enter = yes, N = no\n'
+ .format(file_with_suffix.name))
+ if confirm.lower() == 'y' or not confirm:
+ self.logger.info('Overriding %s', file_with_suffix.name)
+ self.write_file(file_with_suffix, templates, data)
+ break
+ if confirm.lower() == 'n':
+ self.logger.info('Skipping %s', file_with_suffix.name)
+ break
+ except KeyboardInterrupt:
+ print('\nThe user interrupted the execution of the program')
+ sys.exit(1)
+ else:
+ self.logger.info('Writing new %s', file_with_suffix.name)
+ self.write_file(file_with_suffix, templates, data)
+
+ @staticmethod
+ def filter_pattern(interface, pattern):
+ """
+
+ :param interface:
+ :param pattern:
+ :return:
+ """
+ names = tuple(interface.enums.keys()) + tuple(interface.structs.keys())
+
+ if pattern:
+ match = {key: OrderedDict() for key in vars(interface).keys()}
+ match['params'] = interface.params
+ for key, value in vars(interface).items():
+ if key == 'params':
+ continue
+ match[key].update({name: item for name, item in value.items() if re.match(pattern, item.name)})
+ return Interface(**match), names
+ return interface, names
+
+ async def parser(self, source_xml, source_xsd):
+ """
+
+ :param source_xml:
+ :param source_xsd:
+ :return:
+ """
+ try:
+ start = datetime.now()
+ model = self.loop.run_in_executor(None, Parser().parse, source_xml, source_xsd)
+ model = await model
+ self.logger.debug('finish getting model in %s milisec,', (datetime.now() - start).microseconds / 1000.0)
+ return model
+ except ParseError as error1:
+ self.logger.error(error1)
+ sys.exit(1)
+
+ async def get_all_async(self, source_xml, source_xsd):
+ """
+
+ :param source_xml:
+ :param source_xsd:
+ :return:
+ """
+ return await asyncio.gather(self.parser(source_xml, source_xsd), self.get_paths(), self.get_mappings())
+
+ def main(self):
+ """
+ Entry point for parser and generator
+ :return: None
+ """
+ args = self.get_parser()
+ self.config_logging(args.verbose)
+ self.versions_compatibility_validating()
+ self.output_directory = args.output_directory
+
+ interface, paths, mappings = self.loop.run_until_complete(self.get_all_async(args.source_xml, args.source_xsd))
+
+ self.env = [args.templates_directory] + [join(args.templates_directory, k) for k in vars(interface).keys()]
+
+ filtered, names = self.filter_pattern(interface, args.regex_pattern)
+
+ tasks = []
+ functions_transformer = FunctionsProducer(paths, names, mappings)
+ if args.enums and filtered.enums:
+ tasks.append(self.process_main(args.skip, args.overwrite, filtered.enums,
+ EnumsProducer(paths.enum_class, mappings)))
+ if args.structs and filtered.structs:
+ tasks.append(self.process_main(args.skip, args.overwrite, filtered.structs,
+ StructsProducer(paths.struct_class, names, mappings)))
+ if args.functions and filtered.functions:
+ tasks.append(self.process_main(args.skip, args.overwrite, filtered.functions, functions_transformer))
+ tasks.append(self.process_function_name(args.skip, args.overwrite, interface.functions,
+ interface.structs, functions_transformer))
+ if tasks:
+ self.loop.run_until_complete(asyncio.wait(tasks))
+ else:
+ self.logger.warning('Nothing matched with "%s"', args.regex_pattern)
+
+ self.loop.close()
+
+
+if __name__ == '__main__':
+ Generator().main()