#!/usr/bin/env python """ Utility for parsing an AMQP XML spec file and generating a Python module skeleton. This is a fairly ugly program, but it's only intended to be run once. 2007-11-10 Barry Pederson """ # Copyright (C) 2007 Barry Pederson # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA import sys import textwrap from optparse import OptionParser from xml.etree import ElementTree ######### # # Helper code inspired by http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/498286 # described in http://www.agapow.net/programming/python/the-etree-tail-quirk # def _textlist(self, _addtail=False): '''Returns a list of text strings contained within an element and its sub-elements. Helpful for extracting text from prose-oriented XML (such as XHTML or DocBook). ''' result = [] if (not _addtail) and (self.text is not None): result.append(self.text) for elem in self: result.extend(elem.textlist(True)) if _addtail and self.tail is not None: result.append(self.tail) return result # inject the new method into the ElementTree framework ElementTree._Element.textlist = _textlist # # ######### domains = {} method_name_map = {} def _fixup_method_name(class_element, method_element): if class_element.attrib['name'] != class_element.attrib['handler']: prefix = '%s_' % class_element.attrib['name'] else: prefix = '' return ('%s%s' % (prefix, method_element.attrib['name'])).replace('-', '_') def _fixup_field_name(field_element): result = field_element.attrib['name'].replace(' ', '_') if result == 'global': result = 'a_global' return result def _field_type(field_element): if 'type' in field_element.attrib: return field_element.attrib['type'] if 'domain' in field_element.attrib: return domains[field_element.attrib['domain']] def _reindent(s, indent, reformat=True): """ Remove the existing indentation from each line of a chunk of text, s, and then prefix each line with a new indent string. def _reindent(s, indent, reformat=True):
    """
    Remove the existing indentation from each line of a chunk of text, s,
    and then prefix each line with a new indent string.
    Also removes trailing whitespace from each line, and leading and
    trailing blank lines.

    """
    s = textwrap.dedent(s)
    s = s.split('\n')
    s = [x.rstrip() for x in s]

    while s and (not s[0]):
        s = s[1:]

    while s and (not s[-1]):
        s = s[:-1]

    if reformat:
        s = '\n'.join(s)
        s = textwrap.wrap(s, initial_indent=indent, subsequent_indent=indent)
    else:
        s = [indent + x for x in s]

    return '\n'.join(s) + '\n'


def generate_docstr(element, indent='', wrap=None):
    """
    Generate a Python docstr for a given element in the AMQP XML spec file.

    The element could be a class or method

    The 'wrap' parameter is an optional chunk of text that's added to the
    beginning and end of the resulting docstring.
    """ """ result = [] txt = element.text and element.text.rstrip() if txt: result.append(_reindent(txt, indent)) result.append(indent) extra_indent = '' """ rules = element.findall('rule') if rules: result.append(indent + 'RULES:') for r in rules: result.append(indent + 'RULE:') result.append(indent) extra_indent = ' ' rule_docs = generate_docstr(r, indent + ' ') if rule_docs: result.append(extra_indent) result.append(rule_docs) result.append(indent) """ for d in element.findall('doc') + element.findall('rule'): if d.tag == 'rule': result.append(indent + 'RULE:') result.append(indent) extra_indent = ' ' d = d.findall('doc')[0] docval = ''.join(d.textlist()).rstrip() if not docval: continue reformat = True if 'name' in d.attrib: result.append(indent + d.attrib['name'].upper() + ':') result.append(indent) extra_indent = ' ' if d.attrib['name'] == 'grammar': reformat = False # Don't want re-indenting to mess this up #else: # extra_indent = '' result.append(_reindent(docval, indent + extra_indent, reformat)) result.append(indent) fields = element.findall('field') if fields: result.append(indent + 'PARAMETERS:') for f in fields: result.append(indent + ' ' + _fixup_field_name(f) + ': ' + _field_type(f)) field_docs = generate_docstr(f, indent + ' ') if field_docs: result.append(indent) result.append(field_docs) result.append(indent) if not result: return None if wrap is not None: result = [wrap] + result + [wrap] return '\n'.join(x.rstrip() for x in result) + '\n' def generate_methods(class_element, out): methods = class_element.findall('method') methods.sort(key=lambda x: x.attrib['name']) for amqp_method in methods: fields = amqp_method.findall('field') fieldnames = [_fixup_field_name(x) for x in fields] # move any 'ticket' arguments to the end of the method declaration # so that they can have a default value. if 'ticket' in fieldnames: fieldnames = [x for x in fieldnames if x != 'ticket'] + ['ticket'] chassis = [x.attrib['name'] for x in amqp_method.findall('chassis')] if 'server' in chassis: params = ['self'] if 'content' in amqp_method.attrib: params.append('msg') out.write(' def %s(%s):\n' % (_fixup_method_name(class_element, amqp_method), ', '.join(params + fieldnames))) s = generate_docstr(amqp_method, ' ', ' """') if s: out.write(s) if fields: out.write(' args = AMQPWriter()\n') smf_arg = ', args' else: smf_arg = '' for f in fields: out.write(' args.write_%s(%s)\n' % (_field_type(f), _fixup_field_name(f))) if class_element.attrib['name'] == 'connection': smf_pattern = ' self.send_method_frame(0, (%s, %s)%s)\n' else: smf_pattern = ' self.send_method_frame((%s, %s)%s)\n' out.write(smf_pattern % (class_element.attrib['index'], amqp_method.attrib['index'], smf_arg)) if 'synchronous' in amqp_method.attrib: responses = [x.attrib['name'] for x in amqp_method.findall('response')] out.write(' return self.wait(allowed_methods=[\n') for r in responses: resp = method_name_map[(class_element.attrib['name'], r)] out.write(' (%s, %s), # %s\n' % resp) out.write(' ])\n') out.write('\n\n') if 'client' in chassis: out.write(' def _%s(self, args):\n' % _fixup_method_name(class_element, amqp_method)) s = generate_docstr(amqp_method, ' ', ' """') if s: out.write(s) need_pass = True for f in fields: out.write(' %s = args.read_%s()\n' % (_fixup_field_name(f), _field_type(f))) need_pass = False if 'content' in amqp_method.attrib: out.write(' msg = self.wait()\n') need_pass = False if need_pass: out.write(' pass\n') out.write('\n\n') def generate_class(spec, class_element, out): out.write('class %s(object):\n' % class_element.attrib['name'].capitalize()) s = generate_docstr(class_element, ' ', ' """') if s: out.write(s) generate_methods(class_element, out) # # Generate methods for handled classes # for amqp_class in spec.findall('class'): if (amqp_class.attrib['handler'] == class_element.attrib['name']) and (amqp_class.attrib['name'] != class_element.attrib['name']): out.write(' #############\n') out.write(' #\n') out.write(' # %s\n' % amqp_class.attrib['name'].capitalize()) out.write(' #\n') s = generate_docstr(amqp_class, ' # ', ' # ') if s: out.write(s) out.write('\n') generate_methods(amqp_class, out) def generate_module(spec, out): """ Given an AMQP spec parsed into an xml.etree.ElemenTree, and a file-like 'out' object to write to, generate the skeleton of a Python module. """
    # HACK THE SPEC so that 'access' is handled by 'channel' instead of 'connection'
    #
    for amqp_class in spec.findall('class'):
        if amqp_class.attrib['name'] == 'access':
            amqp_class.attrib['handler'] = 'channel'

    #
    # Build up some helper dictionaries
    #
    for domain in spec.findall('domain'):
        domains[domain.attrib['name']] = domain.attrib['type']

    for amqp_class in spec.findall('class'):
        for amqp_method in amqp_class.findall('method'):
            method_name_map[(amqp_class.attrib['name'], amqp_method.attrib['name'])] = \
                (
                    amqp_class.attrib['index'],
                    amqp_method.attrib['index'],
                    amqp_class.attrib['handler'].capitalize() + '.' + _fixup_method_name(amqp_class, amqp_method),
                )

    #### Actually generate output
    for amqp_class in spec.findall('class'):
        if amqp_class.attrib['handler'] == amqp_class.attrib['name']:
            generate_class(spec, amqp_class, out)

    out.write('_METHOD_MAP = {\n')
    for amqp_class in spec.findall('class'):
        for amqp_method in amqp_class.findall('method'):
            chassis = [x.attrib['name'] for x in amqp_method.findall('chassis')]
            if 'client' in chassis:
                out.write("    (%s, %s): (%s, %s._%s),\n" % (
                    amqp_class.attrib['index'],
                    amqp_method.attrib['index'],
                    amqp_class.attrib['handler'].capitalize(),
                    amqp_class.attrib['handler'].capitalize(),
                    _fixup_method_name(amqp_class, amqp_method)))
    out.write('}\n\n')

    out.write('_METHOD_NAME_MAP = {\n')
    for amqp_class in spec.findall('class'):
        for amqp_method in amqp_class.findall('method'):
            out.write("    (%s, %s): '%s.%s',\n" % (
                amqp_class.attrib['index'],
                amqp_method.attrib['index'],
                amqp_class.attrib['handler'].capitalize(),
                _fixup_method_name(amqp_class, amqp_method)))
    out.write('}\n')


def main(argv=None):
    if argv is None:
        argv = sys.argv

    if len(argv) < 2:
        print 'Usage: %s <amqp-spec-xml-file> [<output-file>]' % argv[0]
        return 1

    spec = ElementTree.parse(argv[1])

    if len(argv) < 3:
        out = sys.stdout
    else:
        out = open(argv[2], 'w')

    generate_module(spec, out)


if __name__ == '__main__':
    sys.exit(main())