summaryrefslogtreecommitdiff
path: root/qpid/extras/dispatch/src/router_node.c
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2013-07-08 21:43:48 +0000
committerTed Ross <tross@apache.org>2013-07-08 21:43:48 +0000
commit344fd5a1565f4088b9099bd74c95b836b0bbffab (patch)
tree3f3fcfb66844d720688a86bb5f365267b444eb2f /qpid/extras/dispatch/src/router_node.c
parenta0bb0a54b3d375ca6456e865350ecc8b27f07a42 (diff)
downloadqpid-python-344fd5a1565f4088b9099bd74c95b836b0bbffab.tar.gz
QPID-4968 - Added an IO adapter for python modules to send and receive messages
QPID-4967 - Integrated the python router into the main program - Updated the log module: added the full complement of severity levels - Added stub versions of the dispatch python adapters so the python components can be tested in a standalone environment. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500977 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/extras/dispatch/src/router_node.c')
-rw-r--r--qpid/extras/dispatch/src/router_node.c219
1 files changed, 217 insertions, 2 deletions
diff --git a/qpid/extras/dispatch/src/router_node.c b/qpid/extras/dispatch/src/router_node.c
index 010b008f93..d1e307e754 100644
--- a/qpid/extras/dispatch/src/router_node.c
+++ b/qpid/extras/dispatch/src/router_node.c
@@ -17,6 +17,7 @@
* under the License.
*/
+#include <qpid/dispatch/python_embedded.h>
#include <stdio.h>
#include <string.h>
#include <qpid/dispatch.h>
@@ -24,6 +25,9 @@
static char *module = "ROUTER";
+static void dx_router_python_setup(dx_router_t *router);
+static void dx_pyrouter_tick(dx_router_t *router);
+
//static char *local_prefix = "_local/";
//static char *topo_prefix = "_topo/";
@@ -54,6 +58,8 @@ struct dx_router_t {
dx_timer_t *timer;
hash_t *out_hash;
uint64_t dtag;
+ PyObject *pyRouter;
+ PyObject *pyTick;
};
@@ -459,6 +465,8 @@ static void dx_router_timer_handler(void *context)
//
// Periodic processing.
//
+ dx_pyrouter_tick(router);
+
dx_timer_schedule(router->timer, 1000);
}
@@ -498,10 +506,10 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
router->router_id = id;
router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
- dx_timer_schedule(router->timer, 0); // Immediate
router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
+ router->dtag = 1;
+ router->pyRouter = 0;
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -509,6 +517,11 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
//
dx_field_iterator_set_address(area, id);
+ //
+ // Set up the usage of the embedded python router module.
+ //
+ dx_python_start();
+
dx_log(module, LOG_INFO, "Router started, area=%s id=%s", area, id);
return router;
@@ -517,6 +530,9 @@ dx_router_t *dx_router(dx_dispatch_t *dx, const char *area, const char *id)
void dx_router_setup_agent(dx_dispatch_t *dx)
{
+ dx_router_python_setup(dx->router);
+ dx_timer_schedule(dx->router->timer, 1000);
+
// TODO
}
@@ -526,6 +542,7 @@ void dx_router_free(dx_router_t *router)
dx_container_set_default_node_type(router->dx, 0, 0, DX_DIST_BOTH);
sys_mutex_free(router->lock);
free(router);
+ dx_python_stop();
}
@@ -595,3 +612,201 @@ void dx_router_send(dx_dispatch_t *dx,
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
}
+
+//===============================================================================
+// Python Router Adapter
+//===============================================================================
+
+typedef struct {
+ PyObject_HEAD
+ dx_router_t *router;
+} RouterAdapter;
+
+
+static PyObject* dx_router_add_route(PyObject *self, PyObject *args)
+{
+ //RouterAdapter *adapter = (RouterAdapter*) self;
+ const char *addr;
+ const char *peer;
+
+ if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ return 0;
+
+ // TODO
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyObject* dx_router_del_route(PyObject *self, PyObject *args)
+{
+ //RouterAdapter *adapter = (RouterAdapter*) self;
+ const char *addr;
+ const char *peer;
+
+ if (!PyArg_ParseTuple(args, "ss", &addr, &peer))
+ return 0;
+
+ // TODO
+
+ Py_INCREF(Py_None);
+ return Py_None;
+}
+
+
+static PyMethodDef RouterAdapter_methods[] = {
+ {"add_route", dx_router_add_route, METH_VARARGS, "Add a newly discovered route"},
+ {"del_route", dx_router_del_route, METH_VARARGS, "Delete a route"},
+ {0, 0, 0, 0}
+};
+
+static PyTypeObject RouterAdapterType = {
+ PyObject_HEAD_INIT(0)
+ 0, /* ob_size*/
+ "dispatch.RouterAdapter", /* tp_name*/
+ sizeof(RouterAdapter), /* tp_basicsize*/
+ 0, /* tp_itemsize*/
+ 0, /* tp_dealloc*/
+ 0, /* tp_print*/
+ 0, /* tp_getattr*/
+ 0, /* tp_setattr*/
+ 0, /* tp_compare*/
+ 0, /* tp_repr*/
+ 0, /* tp_as_number*/
+ 0, /* tp_as_sequence*/
+ 0, /* tp_as_mapping*/
+ 0, /* tp_hash */
+ 0, /* tp_call*/
+ 0, /* tp_str*/
+ 0, /* tp_getattro*/
+ 0, /* tp_setattro*/
+ 0, /* tp_as_buffer*/
+ Py_TPFLAGS_DEFAULT, /* tp_flags*/
+ "Dispatch Router Adapter", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ RouterAdapter_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ 0, /* tp_new */
+ 0, /* tp_free */
+ 0, /* tp_is_gc */
+ 0, /* tp_bases */
+ 0, /* tp_mro */
+ 0, /* tp_cache */
+ 0, /* tp_subclasses */
+ 0, /* tp_weaklist */
+ 0, /* tp_del */
+ 0 /* tp_version_tag */
+};
+
+
+static void dx_router_python_setup(dx_router_t *router)
+{
+ PyObject *pDispatchModule = dx_python_module();
+
+ RouterAdapterType.tp_new = PyType_GenericNew;
+ if (PyType_Ready(&RouterAdapterType) < 0) {
+ PyErr_Print();
+ dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter");
+ return;
+ }
+
+ Py_INCREF(&RouterAdapterType);
+ PyModule_AddObject(pDispatchModule, "RouterAdapter", (PyObject*) &RouterAdapterType);
+
+ //
+ // Attempt to import the Python Router module
+ //
+ PyObject* pName;
+ PyObject* pId;
+ PyObject* pArea;
+ PyObject* pModule;
+ PyObject* pClass;
+ PyObject* pArgs;
+
+ pName = PyString_FromString("router");
+ pModule = PyImport_Import(pName);
+ Py_DECREF(pName);
+ if (!pModule) {
+ dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module");
+ return;
+ }
+
+ pClass = PyObject_GetAttrString(pModule, "RouterEngine");
+ if (!pClass || !PyClass_Check(pClass)) {
+ dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module");
+ return;
+ }
+
+ PyObject *adapterType = PyObject_GetAttrString(pDispatchModule, "RouterAdapter");
+ PyObject *adapterInstance = PyObject_CallObject(adapterType, 0);
+ assert(adapterInstance);
+
+ ((RouterAdapter*) adapterInstance)->router = router;
+
+ //
+ // Constructor Arguments for RouterEngine
+ //
+ pArgs = PyTuple_New(3);
+
+ // arg 0: adapter instance
+ PyTuple_SetItem(pArgs, 0, adapterInstance);
+
+ // arg 1: router_id
+ pId = PyString_FromString(router->router_id);
+ PyTuple_SetItem(pArgs, 1, pId);
+
+ // arg 2: area id
+ pArea = PyString_FromString(router->router_area);
+ PyTuple_SetItem(pArgs, 2, pArea);
+
+ //
+ // Instantiate the router
+ //
+ router->pyRouter = PyInstance_New(pClass, pArgs, 0);
+ Py_DECREF(pArgs);
+ Py_DECREF(adapterType);
+
+ if (!router->pyRouter) {
+ PyErr_Print();
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated");
+ return;
+ }
+
+ router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick");
+ if (!router->pyTick || !PyCallable_Check(router->pyTick)) {
+ dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
+ return;
+ }
+}
+
+
+static void dx_pyrouter_tick(dx_router_t *router)
+{
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ pArgs = PyTuple_New(0);
+ pValue = PyObject_CallObject(router->pyTick, pArgs);
+ if (PyErr_Occurred()) {
+ PyErr_Print();
+ }
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
+}
+