#!/usr/bin/python3 # Copyright (C) 2008 OpenedHand Ltd # Copyright (C) 2008 Intel Corporation # # SPDX-License-Identifier: GPL-2.0-or-later # TODO: # - finish code cleanup # - currently allowedValueList is not used: could use it to turn # current char* value to an enum # - could warn if values outside allowedValueRange are used in *_action_set() # - add option to generate skeleton source code that uses the bindings? import os.path import re import xml.etree.ElementTree as ET from optparse import OptionParser # upnp type: (C type, GType, type for g_value_get_* # and g_value_set_*) typemap = { 'ui1': ('guint ', 'G_TYPE_UINT', 'uint'), 'ui2': ('guint ', 'G_TYPE_UINT', 'uint'), 'ui4': ('guint ', 'G_TYPE_UINT', 'uint'), 'i1': ('gint', 'G_TYPE_INT', 'int'), 'i2': ('gint ', 'G_TYPE_INT', 'int'), 'i4': ('gint ', 'G_TYPE_INT', 'int'), 'int': ('gint ', 'G_TYPE_INT', 'int'), 'r4': ('gfloat ', 'G_TYPE_FLOAT', 'float'), 'r8': ('gdouble ', 'G_TYPE_DOUBLE', 'double'), 'number': ('gdouble ', 'G_TYPE_DOUBLE', 'double'), 'fixed.14.4': ('gdouble ', 'G_TYPE_DOUBLE', 'double'), 'float': ('gdouble ', 'G_TYPE_DOUBLE', 'double'), 'char': ('gchar ', 'G_TYPE_CHAR', 'char'), 'string': ('gchar *', 'G_TYPE_STRING', 'string'), 'date': ('gchar *', 'GUPNP_TYPE_DATE', 'string'), 'dateTime': ('gchar *', 'GUPNP_TYPE_DATE_TIME', 'string'), 'dateTime.tz': ('gchar *', 'GUPNP_TYPE_DATE_TIME_TZ', 'string'), 'time': ('gchar *', 'GUPNP_TYPE_TIME', 'string'), 'time.tz': ('gchar *', 'GUPNP_TYPE_TIME_TZ', 'string'), 'boolean': ('gboolean ', 'G_TYPE_BOOLEAN', 'boolean'), 'bin.base64': ('gchar *', 'GUPNP_TYPE_BIN_BASE64', 'string'), 'bin.hex': ('gchar *', 'GUPNP_TYPE_BIN_HEX', 'string'), 'uri': ('gchar *', 'GUPNP_TYPE_URI', 'string'), 'uuid': ('gchar *', 'GUPNP_TYPE_UUID', 'string') } class Action: def __init__(self): self.name = None self.c_name = None self.c_prefixed_name = None self.in_args = [] self.out_args = [] self.notify_vars = [] class Argument: def __init__(self): self.name = None self.c_name = None self.direction = None self.related_var = None class Variable: def __init__(self): self.name = None self.c_name = None self.c_prefixed_name = None self.ctype = None self.gtype = None self.get_function = None self.set_function = None self.notified = True self.dummy = False def camelCaseToLowerCase(str): # http://www.djangosnippets.org/snippets/585/ tmp = re.sub('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))', '_\\1', str) lower_case = tmp.lower().strip('_') return re.sub('[^a-z0-9]', '_', lower_case) def getActions(action_elements, prefix, variables): """ Parse the action element list into a list of Action objects. """ actions = [] for actionElement in action_elements: a = Action() actions.append(a) a.name = actionElement.find("{urn:schemas-upnp-org:service-1-0}name").text if a.name is None: raise Exception("No name found for action") a.c_name = camelCaseToLowerCase(a.name) a.c_prefixed_name = prefix + a.c_name for argElement in actionElement.findall("{urn:schemas-upnp-org:service-1-0}argumentList/{urn:schemas-upnp-org:service-1-0}argument"): arg = Argument() arg.name = argElement.find("{urn:schemas-upnp-org:service-1-0}name").text if arg.name is None: raise Exception("No name found for argument") arg.c_name = camelCaseToLowerCase(arg.name) var_name = argElement.find("{urn:schemas-upnp-org:service-1-0}relatedStateVariable").text for var in variables: if var.name == var_name: arg.related_var = var break if arg.related_var is None: raise Exception("%s: related state variable %s not found" % (arg.name, var_name)) arg.direction = argElement.find("{urn:schemas-upnp-org:service-1-0}direction").text if arg.direction == "in": a.in_args.append(arg) else: a.out_args.append(arg) return actions def getVariables(var_elements, prefix): """ Parse the state variable element list into a list of Variable objects. """ variables = [] for varElement in var_elements: var = Variable() variables.append(var) var.name = varElement.find("{urn:schemas-upnp-org:service-1-0}name").text if var.name.startswith("A_ARG_TYPE_"): # dummy state variable, only there to give type info to getter/setter var.dummy = True var.c_name = camelCaseToLowerCase(var.name) var.c_prefixed_name = prefix + var.c_name if (varElement.get("sendEvents") == "no"): var.notified = False dataType = varElement.find("{urn:schemas-upnp-org:service-1-0}dataType").text if dataType not in typemap: raise Exception("Unknown dataType %s" % dataType) (var.ctype, var.gtype, g_value_type) = typemap[dataType] var.get_function = "g_value_get_" + g_value_type var.set_function = "g_value_set_" + g_value_type return variables def printClientSyncActionBinding(a): indent = (2 + len(a.c_prefixed_name)) * " " print("static inline gboolean") print("%s (GUPnPServiceProxy *proxy," % a.c_prefixed_name) for arg in a.in_args: print("%sconst %sin_%s," % (indent, arg.related_var.ctype, arg.c_name)) for arg in a.out_args: print("%s%s*out_%s," % (indent, arg.related_var.ctype, arg.c_name)) print("%sGError **error)" % indent) print("{") print(" return gupnp_service_proxy_send_action") print(" (proxy, \"%s\", error," % a.name) for arg in a.in_args: print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL,") for arg in a.out_args: print(" \"%s\", %s, out_%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL);") print("}\n") def printClientAsyncActionBinding(a): # User-callback prototype indent = (24 + len(a.c_prefixed_name)) * " " print("typedef void (*%s_reply) (GUPnPServiceProxy *proxy," % a.c_prefixed_name) for arg in a.out_args: print("%sconst %sout_%s," % (indent, arg.related_var.ctype, arg.c_name)) print("%sGError *error," % indent) print("%sgpointer userdata);" % indent) print("") # Generated async callback handler, calls user-provided callback indent = (30 + len(a.c_prefixed_name)) * " " print("static void _%s_async_callback (GUPnPServiceProxy *proxy," % a.c_prefixed_name) print("%sGUPnPServiceProxyAction *action," % indent) print("%sgpointer user_data)" % indent) print("{") print(" GUPnPAsyncData *cbdata;") print(" GError *error = NULL;") for arg in a.out_args: print(" %s%s;" % (arg.related_var.ctype, arg.c_name)) print("") print(" cbdata = (GUPnPAsyncData *) user_data;") print(" gupnp_service_proxy_end_action") print(" (proxy, action, &error,") for arg in a.out_args: print(" \"%s\", %s, &%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL);") print(" ((%s_reply)cbdata->cb)" % a.c_prefixed_name) print(" (proxy,") for arg in a.out_args: print(" %s," % arg.c_name) print(" error, cbdata->userdata);") print("") print(" g_slice_free1 (sizeof (*cbdata), cbdata);") print("}") print("") # Entry point indent = (8 + len(a.c_prefixed_name)) * " " print("static inline GUPnPServiceProxyAction *") print("%s_async (GUPnPServiceProxy *proxy," % a.c_prefixed_name) for arg in a.in_args: print("%sconst %sin_%s," % (indent, arg.related_var.ctype, arg.c_name)) print("%s%s_reply callback," % (indent, a.c_prefixed_name)) print("%sgpointer userdata)" % indent) print("{") print(" GUPnPServiceProxyAction* action;") print(" GUPnPAsyncData *cbdata;") print("") print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));") print(" cbdata->cb = G_CALLBACK (callback);") print(" cbdata->userdata = userdata;") print(" action = gupnp_service_proxy_begin_action") print(" (proxy, \"%s\"," % a.name) print(" _%s_async_callback, cbdata," % a.c_prefixed_name) for arg in a.in_args: print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL);") print("") print(" return action;") print("}") def printClientVariableNotifyBinding(v): ctype = re.sub("^gchar", "const gchar", v.ctype) # callback prototype indent = (22 + len(v.c_prefixed_name)) * " " print("typedef void") print("(*%s_changed_callback) (GUPnPServiceProxy *proxy," % v.c_prefixed_name) print("%s%s%s," % (indent, ctype, v.c_name)) print("%sgpointer userdata);" % indent) print("") # private callback indent = (20 + len(v.c_prefixed_name)) * " " print("static void") print("_%s_changed_callback (GUPnPServiceProxy *proxy," % v.c_prefixed_name) print("%sconst gchar *variable," % indent) print("%sGValue *value," % indent) print("%sgpointer userdata)" % indent) print("{") print(" GUPnPAsyncData *cbdata;") print(" %s%s;" % (ctype, v.c_name)) print("") print(" cbdata = (GUPnPAsyncData *) userdata;") print(" %s = %s (value);" % (v.c_name, v.get_function)) print(" ((%s_changed_callback)cbdata->cb)" % v.c_prefixed_name) print(" (proxy,") print(" %s," % v.c_name) print(" cbdata->userdata);") print("") print(" g_slice_free1 (sizeof (*cbdata), cbdata);") print("}") print("") # public notify_add function indent = (13 + len(v.c_prefixed_name)) * " " print("static inline gboolean") print("%s_add_notify (GUPnPServiceProxy *proxy," % v.c_prefixed_name) print("%s%s_changed_callback callback," % (indent, v.c_prefixed_name)) print("%sgpointer userdata)" % indent) print("{") print(" GUPnPAsyncData *cbdata;") print("") print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));") print(" cbdata->cb = G_CALLBACK (callback);") print(" cbdata->userdata = userdata;") print("") print(" return gupnp_service_proxy_add_notify") print(" (proxy,") print(" \"%s\"," % v.name) print(" %s," % v.gtype) print(" _%s_changed_callback," % v.c_prefixed_name) print(" cbdata);") print("}") def printServerVariableQueryBinding(v): # User callback proto indent = (28 + len(v.ctype)+len(v.c_prefixed_name)) * " " print("typedef %s(*%s_query_callback) (GUPnPService *service," % (v.ctype, v.c_prefixed_name)) print("%sgpointer userdata);" % indent) print("") indent = (12 + len(v.c_prefixed_name)) * " " print("static void") print("_%s_query_cb (GUPnPService *service," % v.c_prefixed_name) print("%sgchar *variable," % indent) print("%sGValue *value," % indent) print("%sgpointer userdata)" % indent) print("{") print(" GUPnPAsyncData *cbdata;") print(" %s%s;" % (v.ctype, v.c_name)) print("") indent = (36 + len(v.c_name) + len(v.c_prefixed_name)) * " " print(" cbdata = (GUPnPAsyncData *) userdata;") print(" %s = ((%s_query_callback)cbdata->cb) (service," % (v.c_name, v.c_prefixed_name)) print("%scbdata->userdata);" % indent) print(" g_value_init (value, %s);" % v.gtype) print(" %s (value, %s);" % (v.set_function, v.c_name)) print("}") print("") indent = (16 + len(v.c_prefixed_name)) * " " print("gulong") print("%s_query_connect (GUPnPService *service," % v.c_prefixed_name) print("%s%s_query_callback callback," % (indent, v.c_prefixed_name)) print("%sgpointer userdata)" % indent) print("{") print(" GUPnPAsyncData *cbdata;") print("") print(" cbdata = (GUPnPAsyncData *) g_slice_alloc (sizeof (*cbdata));") print(" cbdata->cb = G_CALLBACK (callback);") print(" cbdata->userdata = userdata;") print("") print(" return g_signal_connect_data (service,") print(" \"query-variable::%s\"," % v.name) print(" G_CALLBACK (_%s_query_cb)," % v.c_prefixed_name) print(" cbdata,") print(" _free_cb_data,") print(" (GConnectFlags) 0);") print("}") print("") def printServerActionBinding(a): # getter and setter func for GUPnPServiceAction indent = (13 + len(a.c_prefixed_name)) * " " if len(a.in_args) > 0: print("static inline void") print("%s_action_get (GUPnPServiceAction *action," % (a.c_prefixed_name)) for arg in a.in_args[:-1]: print("%s%s*in_%s," % (indent, arg.related_var.ctype, arg.c_name)) print("%s%s*in_%s)" % (indent, a.in_args[-1].related_var.ctype, a.in_args[-1].c_name)) print("{") print(" gupnp_service_action_get (action,") for arg in a.in_args: print(" \"%s\", %s, in_%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL);") print("}") print("") if len(a.out_args) > 0: print("static inline void") print("%s_action_set (GUPnPServiceAction *action," % (a.c_prefixed_name)) for arg in a.out_args[:-1]: print("%sconst %sout_%s," % (indent, arg.related_var.ctype, arg.c_name)) print("%sconst %sout_%s)" % (indent, a.out_args[-1].related_var.ctype, a.out_args[-1].c_name)) print("{") print(" gupnp_service_action_set (action,") for arg in a.out_args: print(" \"%s\", %s, out_%s," % (arg.name, arg.related_var.gtype, arg.c_name)) print(" NULL);") print("}") print("") indent = (17 + len(a.c_prefixed_name)) * " " print("static inline gulong") print("%s_action_connect (GUPnPService *service," % a.c_prefixed_name) print("%sGCallback callback," % indent) print("%sgpointer userdata)" % indent) print("{") print(" return g_signal_connect (service,") print(" \"action-invoked::%s\"," % a.name) print(" callback,") print(" userdata);") print("}") print("") def PrintServerVariableNotifyBinding(v): indent = (18 + len(v.c_prefixed_name)) * " " print("void") print("%s_variable_notify (GUPnPService *service," % v.c_prefixed_name) print("%sconst %s%s)" % (indent, v.ctype, v.c_name)) print("{") print(" gupnp_service_notify (service,") print(" \"%s\", %s, %s," % (v.name, v.gtype, v.c_name)) print(" NULL);") print("}") print("") def parseSCPD(scpd, prefix): """ Parse the scpd file into lists of Action and Variable objects. """ if prefix != "": prefix = prefix.lower() + "_" action_elements = scpd.findall("{urn:schemas-upnp-org:service-1-0}actionList/{urn:schemas-upnp-org:service-1-0}action") var_elements = scpd.findall("{urn:schemas-upnp-org:service-1-0}serviceStateTable/{urn:schemas-upnp-org:service-1-0}stateVariable") variables = getVariables(var_elements, prefix) actions = getActions(action_elements, prefix, variables) return (actions, variables) def printClientBindings(binding_name, actions, variables): define = "GUPNP_GENERATED_CLIENT_BINDING_%s" % binding_name print("/* Generated by gupnp-binding-tool */") print("") print("#include ") print("") print("#ifndef %s" % define) print("#define %s" % define) print("") print("G_BEGIN_DECLS") print("\n#ifndef __GUPNPASYNCDATA_TYPE__") print("#define __GUPNPASYNCDATA_TYPE__") print("typedef struct {GCallback cb; gpointer userdata; } GUPnPAsyncData;") print("#endif") for a in actions: print("\n/* action %s */\n" % a.name) printClientSyncActionBinding(a) printClientAsyncActionBinding(a) for v in variables: if (not v.dummy) and v.notified: print("\n/* state variable %s */\n" % v.name) printClientVariableNotifyBinding(v) print("") print("G_END_DECLS") print("") print("#endif /* %s */" % define) def printServerBindings(binding_name, actions, variables): define = "GUPNP_GENERATED_CLIENT_BINDING_%s" % binding_name print("/* Generated by gupnp-binding-tool */") print("") print("#include ") print("") print("#ifndef %s" % define) print("#define %s" % define) print("") print("G_BEGIN_DECLS") print("\n#ifndef __GUPNPASYNCDATA_TYPE__") print("#define __GUPNPASYNCDATA_TYPE__") print("typedef struct {GCallback cb; gpointer userdata; } GUPnPAsyncData;") print("#endif") print("static void") print("_free_cb_data (gpointer data, GClosure *closure)") print("{") print(" GUPnPAsyncData *cbdata = (GUPnPAsyncData *) data;") print(" g_slice_free1 (sizeof (*cbdata), cbdata);") print("}") print("") for a in actions: print("\n/* action %s */\n" % a.name) printServerActionBinding(a) for v in variables: if not v.dummy: print("\n/* state variable %s */\n" % v.name) printServerVariableQueryBinding(v) if v.notified: PrintServerVariableNotifyBinding(v) print("") print("G_END_DECLS") print("") print("#endif /* %s */" % define) def main(): parser = OptionParser("Usage: %prog [options] service-filename") parser.add_option("-p", "--prefix", dest="prefix", metavar="PREFIX", default="", help="set prefix for generated function names") parser.add_option("-m", "--mode", type="choice", dest="mode", metavar="MODE", default="client", choices=("client", "server"), help="select generation mode, 'client' or 'server'") (options, args) = parser.parse_args() if len(args) != 1: parser.error("Expected 1 argument, got %d" % len(args)) # get a name for header ifdef if options.prefix == "": base = re.sub("[^a-zA-Z0-9]", "_", os.path.basename(args[0])) binding_name = base.upper() else: binding_name = options.prefix.upper() # parse scpd file, extract action list and state variable list scpd = ET.parse(args[0]) (actions, variables) = parseSCPD(scpd, options.prefix) if (len(actions) == 0 and len(variables) == 0): raise Exception("No actions or variables found in document") # generate bindings if (options.mode == "client"): printClientBindings(binding_name, actions, variables) else: printServerBindings(binding_name, actions, variables) if __name__ == "__main__": main()