diff options
| author | Ted Ross <tross@apache.org> | 2013-07-08 21:43:48 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2013-07-08 21:43:48 +0000 |
| commit | 344fd5a1565f4088b9099bd74c95b836b0bbffab (patch) | |
| tree | 3f3fcfb66844d720688a86bb5f365267b444eb2f /qpid/extras/dispatch/src/router_node.c | |
| parent | a0bb0a54b3d375ca6456e865350ecc8b27f07a42 (diff) | |
| download | qpid-python-344fd5a1565f4088b9099bd74c95b836b0bbffab.tar.gz | |
QPID-4968 - Added an IO adapter for python modules to send and receive messages
QPID-4967 - Integrated the python router into the main program
- Updated the log module: added the full complement of severity levels
- Added stub versions of the dispatch python adapters so the python components can be
tested in a standalone environment.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500977 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
| -rw-r--r-- | qpid/extras/dispatch/src/router_node.c | 219 |
1 files changed, 217 insertions, 2 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c index 010b008f93..d1e307e754 100644 --- a/qpid/extras/dispatch/src/router_node.c +++ b/qpid/extras/dispatch/src/router_node.c @@ -17,6 +17,7 @@ * under the License. */ +#include <qpid/dispatch/python_embedded.h> #include <stdio.h> #include <string.h> #include <qpid/dispatch.h> @@ -24,6 +25,9 @@ static char *module = "ROUTER"; +static void dx_router_python_setup(dx_router_t *router); +static void dx_pyrouter_tick(dx_router_t *router); + //static char *local_prefix = "_local/"; //static char *topo_prefix = "_topo/"; @@ -54,6 +58,8 @@ struct dx_router_t { dx_timer_t *timer; hash_t *out_hash; uint64_t dtag; + PyObject *pyRouter; + PyObject *pyTick; }; @@ -459,6 +465,8 @@ static void dx_router_timer_handler(void *context) // // Periodic processing. // + dx_pyrouter_tick(router); + dx_timer_schedule(router->timer, 1000); } @@ -498,10 +506,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) router->router_id = id; router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router); - dx_timer_schedule(router->timer, 0); // Immediate router->out_hash = hash(10, 32, 0); - router->dtag = 1; + router->dtag = 1; + router->pyRouter = 0; // // Inform the field iterator module of this router's id and area. The field iterator @@ -509,6 +517,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) // dx_field_iterator_set_address(area, id); + // + // Set up the usage of the embedded python router module. + // + dx_python_start(); + dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id); return router; @@ -517,6 +530,9 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id) void dx_router_setup_agent(dx_dispatch_t *dx) { + dx_router_python_setup(dx->router); + dx_timer_schedule(dx->router->timer, 1000); + // TODO } @@ -526,6 +542,7 @@ void dx_router_free(dx_router_t *router) dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH); sys_mutex_free(router->lock); free(router); + dx_python_stop(); } @@ -595,3 +612,201 @@ void dx_router_send(dx_dispatch_t *dx, sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher? } + +//=============================================================================== +// Python Router Adapter +//=============================================================================== + +typedef struct { + PyObject_HEAD + dx_router_t *router; +} RouterAdapter; + + +static PyObject* dx_router_add_route(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + const char *addr; + const char *peer; + + if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyObject* dx_router_del_route(PyObject *self, PyObject *args) +{ + //RouterAdapter *adapter = (RouterAdapter*) self; + const char *addr; + const char *peer; + + if (!PyArg_ParseTuple(args, "ss", &addr, &peer)) + return 0; + + // TODO + + Py_INCREF(Py_None); + return Py_None; +} + + +static PyMethodDef RouterAdapter_methods[] = { + {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"}, + {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"}, + {0, 0, 0, 0} +}; + +static PyTypeObject RouterAdapterType = { + PyObject_HEAD_INIT(0) + 0, /* ob_size*/ + "dispatch.RouterAdapter", /* tp_name*/ + sizeof(RouterAdapter), /* tp_basicsize*/ + 0, /* tp_itemsize*/ + 0, /* tp_dealloc*/ + 0, /* tp_print*/ + 0, /* tp_getattr*/ + 0, /* tp_setattr*/ + 0, /* tp_compare*/ + 0, /* tp_repr*/ + 0, /* tp_as_number*/ + 0, /* tp_as_sequence*/ + 0, /* tp_as_mapping*/ + 0, /* tp_hash */ + 0, /* tp_call*/ + 0, /* tp_str*/ + 0, /* tp_getattro*/ + 0, /* tp_setattro*/ + 0, /* tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /* tp_flags*/ + "Dispatch Router Adapter", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + RouterAdapter_methods, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ + 0, /* tp_bases */ + 0, /* tp_mro */ + 0, /* tp_cache */ + 0, /* tp_subclasses */ + 0, /* tp_weaklist */ + 0, /* tp_del */ + 0 /* tp_version_tag */ +}; + + +static void dx_router_python_setup(dx_router_t *router) +{ + PyObject *pDispatchModule = dx_python_module(); + + RouterAdapterType.tp_new = PyType_GenericNew; + if (PyType_Ready(&RouterAdapterType) < 0) { + PyErr_Print(); + dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter"); + return; + } + + Py_INCREF(&RouterAdapterType); + PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType); + + // + // Attempt to import the Python Router module + // + PyObject* pName; + PyObject* pId; + PyObject* pArea; + PyObject* pModule; + PyObject* pClass; + PyObject* pArgs; + + pName = PyString_FromString("router"); + pModule = PyImport_Import(pName); + Py_DECREF(pName); + if (!pModule) { + dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module"); + return; + } + + pClass = PyObject_GetAttrString(pModule, "RouterEngine"); + if (!pClass || !PyClass_Check(pClass)) { + dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module"); + return; + } + + PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter"); + PyObject *adapterInstance = PyObject_CallObject(adapterType, 0); + assert(adapterInstance); + + ((RouterAdapter*) adapterInstance)->router = router; + + // + // Constructor Arguments for RouterEngine + // + pArgs = PyTuple_New(3); + + // arg 0: adapter instance + PyTuple_SetItem(pArgs, 0, adapterInstance); + + // arg 1: router_id + pId = PyString_FromString(router->router_id); + PyTuple_SetItem(pArgs, 1, pId); + + // arg 2: area id + pArea = PyString_FromString(router->router_area); + PyTuple_SetItem(pArgs, 2, pArea); + + // + // Instantiate the router + // + router->pyRouter = PyInstance_New(pClass, pArgs, 0); + Py_DECREF(pArgs); + Py_DECREF(adapterType); + + if (!router->pyRouter) { + PyErr_Print(); + dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated"); + return; + } + + router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick"); + if (!router->pyTick || !PyCallable_Check(router->pyTick)) { + dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method"); + return; + } +} + + +static void dx_pyrouter_tick(dx_router_t *router) +{ + PyObject *pArgs; + PyObject *pValue; + + pArgs = PyTuple_New(0); + pValue = PyObject_CallObject(router->pyTick, pArgs); + if (PyErr_Occurred()) { + PyErr_Print(); + } + Py_DECREF(pArgs); + if (pValue) { + Py_DECREF(pValue); + } +} + |
