#include #include #include #include #include #include #include #include "connection.h" #include "distmeta.h" #include "_amqstate.h" #define PYRABBITMQ_CONNECTION_ERROR 0x10 #define PYRABBITMQ_CHANNEL_ERROR 0x20 #define PYRABBITMQ_MODULE_NAME "_librabbitmq" #define PYRABBITMQ_MODULE_DESC "Hand-made wrapper for librabbitmq." /* ------: Private Prototypes :------------------------------------------- */ PyMODINIT_FUNC init_librabbitmq(void); extern PyObject *PyRabbitMQ_socket_timeout; /* Exceptions */ PyObject *PyRabbitMQExc_ConnectionError; PyObject *PyRabbitMQExc_ChannelError; PyObject *PyRabbitMQ_socket_timeout; _PYRMQ_INLINE amqp_table_entry_t* AMQTable_AddEntry(amqp_table_t*, amqp_bytes_t); _PYRMQ_INLINE void AMQTable_SetTableValue(amqp_table_t*, amqp_bytes_t, amqp_table_t); _PYRMQ_INLINE void AMQTable_SetArrayValue(amqp_table_t*, amqp_bytes_t, amqp_array_t); _PYRMQ_INLINE void AMQTable_SetStringValue(amqp_table_t*, amqp_bytes_t, amqp_bytes_t); _PYRMQ_INLINE void AMQTable_SetBoolValue(amqp_table_t*, amqp_bytes_t, int); _PYRMQ_INLINE void AMQTable_SetIntValue(amqp_table_t *, amqp_bytes_t, int); _PYRMQ_INLINE void AMQTable_SetNilValue(amqp_table_t *, amqp_bytes_t); _PYRMQ_INLINE void AMQTable_SetDoubleValue(amqp_table_t *, amqp_bytes_t, double); _PYRMQ_INLINE amqp_field_value_t* AMQArray_AddEntry(amqp_array_t*); _PYRMQ_INLINE void AMQArray_SetNilValue(amqp_array_t*); _PYRMQ_INLINE void AMQArray_SetTableValue(amqp_array_t*, amqp_table_t); _PYRMQ_INLINE void AMQArray_SetArrayValue(amqp_array_t*, amqp_array_t); _PYRMQ_INLINE void AMQArray_SetStringValue(amqp_array_t*, amqp_bytes_t); _PYRMQ_INLINE void AMQArray_SetIntValue(amqp_array_t *, int); _PYRMQ_INLINE int64_t RabbitMQ_now_usec(void); _PYRMQ_INLINE int RabbitMQ_wait_nb(int); _PYRMQ_INLINE int RabbitMQ_wait_timeout(int, double); _PYRMQ_INLINE void basic_properties_to_PyDict(amqp_basic_properties_t*, PyObject*); _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *, amqp_basic_properties_t *, amqp_connection_state_t, amqp_pool_t *); _PYRMQ_INLINE void amqp_basic_deliver_to_PyDict(PyObject *, uint64_t, amqp_bytes_t, amqp_bytes_t, amqp_boolean_t); _PYRMQ_INLINE int PyRabbitMQ_ApplyCallback(PyRabbitMQ_Connection *, PyObject *, PyObject *, PyObject *, PyObject *, PyObject *); int PyRabbitMQ_recv(PyRabbitMQ_Connection *, PyObject *, amqp_connection_state_t, int); unsigned int PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *, unsigned int); unsigned int PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *, unsigned int); int PyRabbitMQ_HandleError(int, char const *); _PYRMQ_INLINE int PyRabbitMQ_HandlePollError(int); int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *, unsigned int, amqp_rpc_reply_t, const char *); void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t*); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *); static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t, PyObject *, amqp_pool_t *); static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t, PyObject *, amqp_pool_t *); static PyObject* AMQTable_toPyDict(amqp_table_t *table); static PyObject* AMQArray_toPyList(amqp_array_t *array); int PyRabbitMQ_HandleAMQStatus(int, const char *); int PyRabbitMQ_Not_Connected(PyRabbitMQ_Connection *self) { if (!self->connected) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Operation on closed connection"); return 1; } return 0; } /* ------: AMQP Utils :--------------------------------------------------- */ _PYRMQ_INLINE amqp_table_entry_t* AMQTable_AddEntry(amqp_table_t *table, amqp_bytes_t key) { amqp_table_entry_t *entry = &table->entries[table->num_entries]; table->num_entries++; entry->key = key; return entry; } _PYRMQ_INLINE void AMQTable_SetTableValue(amqp_table_t *table, amqp_bytes_t key, amqp_table_t value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_TABLE; entry->value.value.table = value; } _PYRMQ_INLINE void AMQTable_SetArrayValue(amqp_table_t *table, amqp_bytes_t key, amqp_array_t value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_ARRAY; entry->value.value.array = value; } _PYRMQ_INLINE void AMQTable_SetStringValue(amqp_table_t *table, amqp_bytes_t key, amqp_bytes_t value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_UTF8; entry->value.value.bytes = value; } _PYRMQ_INLINE void AMQTable_SetBoolValue(amqp_table_t *table, amqp_bytes_t key, int value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_BOOLEAN; entry->value.value.boolean = value; } _PYRMQ_INLINE void AMQTable_SetIntValue(amqp_table_t *table, amqp_bytes_t key, int value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_I32; entry->value.value.i32 = value; } _PYRMQ_INLINE void AMQTable_SetNilValue(amqp_table_t *table, amqp_bytes_t key) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_VOID; } _PYRMQ_INLINE void AMQTable_SetDoubleValue(amqp_table_t *table, amqp_bytes_t key, double value) { amqp_table_entry_t *entry = AMQTable_AddEntry(table, key); entry->value.kind = AMQP_FIELD_KIND_F64; entry->value.value.f64 = value; } _PYRMQ_INLINE amqp_field_value_t* AMQArray_AddEntry(amqp_array_t *array) { amqp_field_value_t *entry = &array->entries[array->num_entries]; array->num_entries++; return entry; } _PYRMQ_INLINE void AMQArray_SetNilValue(amqp_array_t *array) { amqp_field_value_t *entry = AMQArray_AddEntry(array); entry->kind = AMQP_FIELD_KIND_VOID; } _PYRMQ_INLINE void AMQArray_SetTableValue(amqp_array_t *array, amqp_table_t value) { amqp_field_value_t *entry = AMQArray_AddEntry(array); entry->kind = AMQP_FIELD_KIND_TABLE; entry->value.table = value; } _PYRMQ_INLINE void AMQArray_SetArrayValue(amqp_array_t *array, amqp_array_t value) { amqp_field_value_t *entry = AMQArray_AddEntry(array); entry->kind = AMQP_FIELD_KIND_ARRAY; entry->value.array = value; } _PYRMQ_INLINE void AMQArray_SetStringValue(amqp_array_t *array, amqp_bytes_t value) { amqp_field_value_t *entry = AMQArray_AddEntry(array); entry->kind = AMQP_FIELD_KIND_UTF8; entry->value.bytes = value; } _PYRMQ_INLINE void AMQArray_SetIntValue(amqp_array_t *array, int value) { amqp_field_value_t *entry = AMQArray_AddEntry(array); entry->kind = AMQP_FIELD_KIND_I32; entry->value.i32 = value; } static amqp_table_t PyDict_ToAMQTable(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool) { PyObject *dkey = NULL; PyObject *dvalue = NULL; Py_ssize_t size = 0; Py_ssize_t pos = 0; uint64_t clong_value = 0; double cdouble_value = 0.0; int is_unicode = 0; amqp_table_t dst = amqp_empty_table; size = PyDict_Size(src); /* allocate new table */ dst.num_entries = 0; dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_table_entry_t)); while (PyDict_Next(src, &pos, &dkey, &dvalue)) { dkey = Maybe_Unicode(dkey); if (dvalue == Py_None) { /* None */ AMQTable_SetNilValue(&dst, PyString_AS_AMQBYTES(dkey)); } else if (PyDict_Check(dvalue)) { /* Dict */ AMQTable_SetTableValue(&dst, PyString_AS_AMQBYTES(dkey), PyDict_ToAMQTable(conn, dvalue, pool)); } else if (PyList_Check(dvalue) || PyTuple_Check(dvalue)) { /* List */ AMQTable_SetArrayValue(&dst, PyString_AS_AMQBYTES(dkey), PyIter_ToAMQArray(conn, dvalue, pool)); } else if (PyBool_Check(dvalue)) { /* Bool */ clong_value = 0; /* default false */ if (dvalue == Py_True) clong_value = 1; AMQTable_SetBoolValue(&dst, PyString_AS_AMQBYTES(dkey), clong_value); } else if (PyLong_Check(dvalue) || PyInt_Check(dvalue)) { /* Int | Long */ clong_value = (int64_t)PyLong_AsLong(dvalue); if (clong_value == -1) goto error; AMQTable_SetIntValue(&dst, PyString_AS_AMQBYTES(dkey), clong_value ); } else if (PyFloat_Check(dvalue)) { cdouble_value = PyFloat_AsDouble(dvalue); if (cdouble_value == -1) goto error; AMQTable_SetDoubleValue(&dst, PyString_AS_AMQBYTES(dkey), cdouble_value ); } else { /* String | Unicode */ is_unicode = PyUnicode_Check(dvalue); if (is_unicode || PyBytes_Check(dvalue)) { if (is_unicode) { if ((dvalue = PyUnicode_AsASCIIString(dvalue)) == NULL) goto error; } AMQTable_SetStringValue(&dst, PyString_AS_AMQBYTES(dkey), PyString_AS_AMQBYTES(dvalue) ); } else { /* unsupported type */ PyErr_Format(PyExc_ValueError, "Table member %s is of an unsupported type", PyBytes_AS_STRING(dkey)); goto error; } } } return dst; error: assert(PyErr_Occurred()); return amqp_empty_table; } static amqp_array_t PyIter_ToAMQArray(amqp_connection_state_t conn, PyObject *src, amqp_pool_t *pool) { Py_ssize_t pos = 0; uint64_t clong_value = 0; int is_unicode = 0; amqp_array_t dst = amqp_empty_array; Py_ssize_t size = PySequence_Size(src); if (size == -1) return dst; PyObject *iterator = PyObject_GetIter(src); if (iterator == NULL) return dst; PyObject *item = NULL; /* allocate new amqp array */ dst.num_entries = 0; dst.entries = amqp_pool_alloc(pool, size * sizeof(amqp_field_value_t)); while (item = PyIter_Next(iterator)) { if (item == Py_None) { /* None */ AMQArray_SetNilValue(&dst); } else if (PyDict_Check(item)) { /* Dict */ AMQArray_SetTableValue( &dst, PyDict_ToAMQTable(conn, item, pool)); } else if (PyList_Check(item) || PyTuple_Check(item)) { /* List */ AMQArray_SetArrayValue( &dst, PyIter_ToAMQArray(conn, item, pool)); } else if (PyLong_Check(item) || PyInt_Check(item)) { /* Int | Long */ clong_value = (int64_t)PyLong_AsLongLong(item); AMQArray_SetIntValue(&dst, clong_value); } else { /* String | Unicode */ is_unicode = PyUnicode_Check(item); if (is_unicode || PyBytes_Check(item)) { if (is_unicode) { if ((item = PyUnicode_AsASCIIString(item)) == NULL) goto item_error; } AMQArray_SetStringValue( &dst, PyString_AS_AMQBYTES(item)); } else { /* unsupported type */ PyErr_Format(PyExc_ValueError, "Array member at index %lu, %R, is of an unsupported type", pos, item); goto item_error; } } Py_XDECREF(item); } return dst; item_error: Py_XDECREF(item); error: Py_XDECREF(iterator); assert(PyErr_Occurred()); return dst; } _PYRMQ_INLINE int64_t RabbitMQ_now_usec(void) { struct timeval tv; gettimeofday(&tv, NULL); return (int64_t)tv.tv_sec * 10e5 + (int64_t)tv.tv_usec; } _PYRMQ_INLINE int RabbitMQ_wait_nb(int sockfd) { register int result = 0; fd_set fdset; struct timeval tv = {0, 0}; FD_ZERO(&fdset); FD_SET(sockfd, &fdset); result = select(sockfd + 1, &fdset, NULL, NULL, &tv); if (result <= 0) return result; if (FD_ISSET(sockfd, &fdset)) return 1; return 0; } _PYRMQ_INLINE int RabbitMQ_wait_timeout(int sockfd, double timeout) { int64_t t1, t2; register int result = 0; fd_set fdset; struct timeval tv; while (timeout > 0.0) { FD_ZERO(&fdset); FD_SET(sockfd, &fdset); tv.tv_sec = (int)timeout; tv.tv_usec = (int)((timeout - tv.tv_sec) * 1e6); t1 = RabbitMQ_now_usec(); result = select(sockfd + 1, &fdset, NULL, NULL, &tv); if (result <= 0) break; if (FD_ISSET(sockfd, &fdset)) { result = 1; break; } t2 = RabbitMQ_now_usec(); timeout -= (double)(t2 / 1e6) - (t1 / 1e6); } return result; } _PYRMQ_INLINE void basic_properties_to_PyDict(amqp_basic_properties_t *props, PyObject *p) { register PyObject *value = NULL; if (props->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { value = PySTRING_FROM_AMQBYTES(props->content_type); PyDICT_SETSTR_DECREF(p, "content_type", value); } if (props->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) { value = PySTRING_FROM_AMQBYTES(props->content_encoding); PyDICT_SETSTR_DECREF(p, "content_encoding", value); } if (props->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) { value = PySTRING_FROM_AMQBYTES(props->correlation_id); PyDICT_SETSTR_DECREF(p, "correlation_id", value); } if (props->_flags & AMQP_BASIC_REPLY_TO_FLAG) { value = PySTRING_FROM_AMQBYTES(props->reply_to); PyDICT_SETSTR_DECREF(p, "reply_to", value); } if (props->_flags & AMQP_BASIC_EXPIRATION_FLAG) { value = PySTRING_FROM_AMQBYTES(props->expiration); PyDICT_SETSTR_DECREF(p, "expiration", value); } if (props->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) { value = PySTRING_FROM_AMQBYTES(props->message_id); PyDICT_SETSTR_DECREF(p, "message_id", value); } if (props->_flags & AMQP_BASIC_TYPE_FLAG) { value = PySTRING_FROM_AMQBYTES(props->type); PyDICT_SETSTR_DECREF(p, "type", value); } if (props->_flags & AMQP_BASIC_USER_ID_FLAG) { value = PySTRING_FROM_AMQBYTES(props->user_id); PyDICT_SETSTR_DECREF(p, "user_id", value); } if (props->_flags & AMQP_BASIC_APP_ID_FLAG) { value = PySTRING_FROM_AMQBYTES(props->app_id); PyDICT_SETSTR_DECREF(p, "app_id", value); } if (props->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { value = PyInt_FromLong(props->delivery_mode); PyDICT_SETSTR_DECREF(p, "delivery_mode", value); } if (props->_flags & AMQP_BASIC_PRIORITY_FLAG) { value =PyInt_FromLong(props->priority); PyDICT_SETSTR_DECREF(p, "priority", value); } if (props->_flags & AMQP_BASIC_TIMESTAMP_FLAG) { value = PyInt_FromLong(props->timestamp); PyDICT_SETSTR_DECREF(p, "timestamp", value); } if (props->_flags & AMQP_BASIC_HEADERS_FLAG) { value = AMQTable_toPyDict(&(props->headers)); PyDICT_SETSTR_DECREF(p, "headers", value); } } static PyObject* AMQTable_toPyDict(amqp_table_t *table) { register PyObject *key = NULL; register PyObject *value = NULL; PyObject *dict = NULL; dict = PyDict_New(); if (table) { int i; for (i = 0; i < table->num_entries; ++i, key=value=NULL) { switch (table->entries[i].value.kind) { case AMQP_FIELD_KIND_VOID: value = Py_None; break; case AMQP_FIELD_KIND_BOOLEAN: value = PyBool_FromLong(AMQTable_VAL(table, i, boolean)); break; case AMQP_FIELD_KIND_I8: value = PyInt_FromLong(AMQTable_VAL(table, i, i8)); break; case AMQP_FIELD_KIND_I16: value = PyInt_FromLong(AMQTable_VAL(table, i, i16)); break; case AMQP_FIELD_KIND_I32: value = PyInt_FromLong(AMQTable_VAL(table, i, i32)); break; case AMQP_FIELD_KIND_I64: value = PyLong_FromLong(AMQTable_VAL(table, i, i64)); break; case AMQP_FIELD_KIND_U8: value = PyLong_FromUnsignedLong( AMQTable_VAL(table, i, u8)); break; case AMQP_FIELD_KIND_U16: value = PyLong_FromUnsignedLong( AMQTable_VAL(table, i, u16)); break; case AMQP_FIELD_KIND_U32: value = PyLong_FromUnsignedLong( AMQTable_VAL(table, i, u32)); break; case AMQP_FIELD_KIND_U64: value = PyLong_FromUnsignedLong( AMQTable_VAL(table, i, u64)); break; case AMQP_FIELD_KIND_F32: value = PyFloat_FromDouble(AMQTable_VAL(table, i, f32)); break; case AMQP_FIELD_KIND_F64: value = PyFloat_FromDouble(AMQTable_VAL(table, i, f64)); break; case AMQP_FIELD_KIND_UTF8: value = PySTRING_FROM_AMQBYTES( AMQTable_VAL(table, i, bytes)); break; case AMQP_FIELD_KIND_TABLE: value = AMQTable_toPyDict(&(AMQTable_VAL(table, i, table))); break; case AMQP_FIELD_KIND_ARRAY: value = AMQArray_toPyList(&(AMQTable_VAL(table, i, array))); break; } key = AMQTable_TO_PYKEY(table, i); if (value == Py_None || value == NULL) /* unsupported type */ PyDICT_SETNONE_DECREF(dict, key); else PyDICT_SETKV_DECREF(dict, key, value); } } return dict; } static PyObject* AMQArray_toPyList(amqp_array_t *array) { register PyObject *value = NULL; PyObject *list = NULL; list = PyList_New((Py_ssize_t) array->num_entries); if (array) { int i; for (i = 0; i < array->num_entries; ++i, value=NULL) { switch (array->entries[i].kind) { case AMQP_FIELD_KIND_BOOLEAN: value = PyBool_FromLong(AMQArray_VAL(array, i, boolean)); break; case AMQP_FIELD_KIND_I8: value = PyInt_FromLong(AMQArray_VAL(array, i, i8)); break; case AMQP_FIELD_KIND_I16: value = PyInt_FromLong(AMQArray_VAL(array, i, i16)); break; case AMQP_FIELD_KIND_I32: value = PyInt_FromLong(AMQArray_VAL(array, i, i32)); break; case AMQP_FIELD_KIND_I64: value = PyLong_FromLong(AMQArray_VAL(array, i, i64)); break; case AMQP_FIELD_KIND_U8: value = PyLong_FromUnsignedLong(AMQArray_VAL(array, i, u8)); break; case AMQP_FIELD_KIND_U16: value = PyLong_FromUnsignedLong( AMQArray_VAL(array, i, u16)); break; case AMQP_FIELD_KIND_U32: value = PyLong_FromUnsignedLong( AMQArray_VAL(array, i, u32)); break; case AMQP_FIELD_KIND_U64: value = PyLong_FromUnsignedLong( AMQArray_VAL(array, i, u64)); break; case AMQP_FIELD_KIND_F32: value = PyFloat_FromDouble(AMQArray_VAL(array, i, f32)); break; case AMQP_FIELD_KIND_F64: value = PyFloat_FromDouble(AMQArray_VAL(array, i, f64)); break; case AMQP_FIELD_KIND_UTF8: value = PySTRING_FROM_AMQBYTES( AMQArray_VAL(array, i, bytes)); break; case AMQP_FIELD_KIND_TABLE: value = AMQTable_toPyDict(&(AMQArray_VAL(array, i, table))); break; case AMQP_FIELD_KIND_ARRAY: value = AMQArray_toPyList(&(AMQArray_VAL(array, i, array))); break; default: /* unsupported type */ Py_INCREF(Py_None); value = Py_None; break; } PyList_SET_ITEM(list, i, value); } } return list; } _PYRMQ_INLINE int PyDict_to_basic_properties(PyObject *p, amqp_basic_properties_t *props, amqp_connection_state_t conn, amqp_pool_t *pool) { PyObject *value = NULL; props->headers = amqp_empty_table; props->_flags = AMQP_BASIC_HEADERS_FLAG; if ((value = PyDict_GetItemString(p, "content_type")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->content_type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "content_encoding")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->content_encoding = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CONTENT_ENCODING_FLAG; } if ((value = PyDict_GetItemString(p, "correlation_id")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->correlation_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_CORRELATION_ID_FLAG; } if ((value = PyDict_GetItemString(p, "reply_to")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->reply_to = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_REPLY_TO_FLAG; } if ((value = PyDict_GetItemString(p, "expiration")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->expiration = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_EXPIRATION_FLAG; } if ((value = PyDict_GetItemString(p, "message_id")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->message_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_MESSAGE_ID_FLAG; } if ((value = PyDict_GetItemString(p, "type")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->type = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_TYPE_FLAG; } if ((value = PyDict_GetItemString(p, "user_id")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->user_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_USER_ID_FLAG; } if ((value = PyDict_GetItemString(p, "app_id")) != NULL) { if ((value = Maybe_Unicode(value)) == NULL) return -1; props->app_id = PyString_AS_AMQBYTES(value); props->_flags |= AMQP_BASIC_APP_ID_FLAG; } if ((value = PyDict_GetItemString(p, "delivery_mode")) != NULL) { props->delivery_mode = (uint8_t)PyInt_AS_LONG(value); props->_flags |= AMQP_BASIC_DELIVERY_MODE_FLAG; } if ((value = PyDict_GetItemString(p, "priority")) != NULL) { props->priority = (uint8_t)PyInt_AS_LONG(value); props->_flags |= AMQP_BASIC_PRIORITY_FLAG; } if ((value = PyDict_GetItemString(p, "timestamp")) != NULL) { props->timestamp = (uint8_t)PyInt_AS_LONG(value); props->_flags |= AMQP_BASIC_TIMESTAMP_FLAG; } if ((value = PyDict_GetItemString(p, "headers")) != NULL) { props->headers = PyDict_ToAMQTable(conn, value, pool); if (PyErr_Occurred()) return -1; } return 1; } _PYRMQ_INLINE void amqp_basic_deliver_to_PyDict(PyObject *dest, uint64_t delivery_tag, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_boolean_t redelivered) { PyObject *value = NULL; /* -- delivery_tag (PyInt) */ value = PyLong_FromLongLong(delivery_tag); PyDICT_SETSTR_DECREF(dest, "delivery_tag", value); /* -- exchange (PyString) */ value = PySTRING_FROM_AMQBYTES(exchange); PyDICT_SETSTR_DECREF(dest, "exchange", value); /* -- routing_key (PyString) */ value = PySTRING_FROM_AMQBYTES(routing_key); PyDICT_SETSTR_DECREF(dest, "routing_key", value); /* -- redelivered (PyBool) */ value = PyBool_FromLong((long)redelivered); PyDICT_SETSTR_DECREF(dest, "redelivered", value); return; } /* ------: Error Handlers :----------------------------------------------- */ int PyRabbitMQ_HandleError(int ret, char const *context) { if (ret < 0) { char errorstr[1024]; snprintf(errorstr, sizeof(errorstr), "%s: %s", context, strerror(-ret)); PyErr_SetString(PyRabbitMQExc_ConnectionError, errorstr); return 0; } return 1; } _PYRMQ_INLINE int PyRabbitMQ_HandlePollError(int ready) { if (ready < 0 && !PyErr_Occurred()) PyErr_SetFromErrno(PyExc_OSError); if (!ready && !PyErr_Occurred()) PyErr_SetString(PyRabbitMQ_socket_timeout, "timed out"); return ready; } int PyRabbitMQ_HandleAMQStatus(int status, const char *context) { char errorstr[1024]; if (status) { snprintf(errorstr, sizeof(errorstr), "%s: %s", context, amqp_error_string2(status)); PyErr_SetString(PyRabbitMQExc_ConnectionError, errorstr); } return status; } int PyRabbitMQ_HandleAMQError(PyRabbitMQ_Connection *self, unsigned int channel, amqp_rpc_reply_t reply, const char *context) { char errorstr[1024]; switch (reply.reply_type) { case AMQP_RESPONSE_NORMAL: return 0; case AMQP_RESPONSE_NONE: snprintf(errorstr, sizeof(errorstr), "%s: missing RPC reply type!", context); goto connerror; case AMQP_RESPONSE_LIBRARY_EXCEPTION: snprintf(errorstr, sizeof(errorstr), "%s: %s", context, reply.library_error ? amqp_error_string2(reply.library_error) : "(end-of-stream)"); goto connerror; case AMQP_RESPONSE_SERVER_EXCEPTION: switch (reply.reply.id) { case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *m = (amqp_connection_close_t *) reply.reply.decoded; snprintf(errorstr, sizeof(errorstr), "%s: server connection error %d, message: %.*s", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); goto connerror; } case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *m = (amqp_channel_close_t *) reply.reply.decoded; snprintf(errorstr, sizeof(errorstr), "%s: server channel error %d, message: %.*s", context, m->reply_code, (int) m->reply_text.len, (char *) m->reply_text.bytes); goto chanerror; } default: snprintf(errorstr, sizeof(errorstr), "%s: unknown server error, method id 0x%08X", context, reply.reply.id); goto connerror; } break; } connerror: PyErr_SetString(PyRabbitMQExc_ConnectionError, errorstr); PyRabbitMQ_Connection_close(self); return PYRABBITMQ_CONNECTION_ERROR; chanerror: PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr); PyRabbitMQ_revive_channel(self, channel); return PYRABBITMQ_CHANNEL_ERROR; } void PyRabbitMQ_SetErr_UnexpectedHeader(amqp_frame_t* frame) { char errorstr[1024]; snprintf(errorstr, sizeof(errorstr), "Unexpected header %d", frame->frame_type); PyErr_SetString(PyRabbitMQExc_ChannelError, errorstr); } unsigned int PyRabbitMQ_revive_channel(PyRabbitMQ_Connection *self, unsigned int channel) { int status = -1; amqp_channel_close_ok_t req; status = amqp_send_method(self->conn, (amqp_channel_t)channel, AMQP_CHANNEL_CLOSE_OK_METHOD, &req); if (status < 0) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Couldn't revive channel"); PyRabbitMQ_Connection_close(self); return 1; } return PyRabbitMQ_Connection_create_channel(self, channel); } /* ------: Connection :--------------------------------------------------- */ /* * Connection.__new__() * */ static PyRabbitMQ_Connection* PyRabbitMQ_ConnectionType_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyRabbitMQ_Connection *self; self = (PyRabbitMQ_Connection *)PyType_GenericNew(type, args, kwargs); if (self != NULL) { self->conn = NULL; self->hostname = NULL; self->userid = NULL; self->password = NULL; self->virtual_host = NULL; self->port = 5672; self->sockfd = 0; self->connected = 0; self->server_properties = NULL; self->callbacks = NULL; } return self; } /* * Connection.__del__() */ static void PyRabbitMQ_ConnectionType_dealloc(PyRabbitMQ_Connection *self) { if (self->weakreflist != NULL) PyObject_ClearWeakRefs((PyObject*)self); if (self->hostname != NULL) PyMem_Free(self->hostname); if (self->userid != NULL) PyMem_Free(self->userid); if (self->password != NULL) PyMem_Free(self->password); if (self->virtual_host != NULL) PyMem_Free(self->virtual_host); Py_XDECREF(self->callbacks); Py_XDECREF(self->client_properties); Py_XDECREF(self->server_properties); Py_TYPE(self)->tp_free(self); } /* * Connection.__init__() */ static int PyRabbitMQ_ConnectionType_init(PyRabbitMQ_Connection *self, PyObject *args, PyObject *kwargs) { static char *kwlist[] = { "hostname", "userid", "password", "virtual_host", "port", "channel_max", "frame_max", "heartbeat", "client_properties", NULL }; char *hostname; char *userid; char *password; char *virtual_host; int channel_max = 0xffff; int frame_max = 131072; int heartbeat = 0; int port = 5672; PyObject *client_properties = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ssssiiiiO", kwlist, &hostname, &userid, &password, &virtual_host, &port, &channel_max, &frame_max, &heartbeat, &client_properties)) { return -1; } self->hostname = PyMem_Malloc(strlen(hostname) + 1); self->userid = PyMem_Malloc(strlen(userid) + 1); self->password = PyMem_Malloc(strlen(password) + 1); self->virtual_host = PyMem_Malloc(strlen(virtual_host) + 1); if (self->hostname == NULL || self->userid == NULL || self->password == NULL || self->virtual_host == NULL) { return PyErr_NoMemory(); } strcpy(self->hostname, hostname); strcpy(self->userid, userid); strcpy(self->password, password); strcpy(self->virtual_host, virtual_host); self->port = port; self->channel_max = channel_max; self->frame_max = frame_max; self->heartbeat = heartbeat; self->weakreflist = NULL; self->callbacks = PyDict_New(); if (self->callbacks == NULL) return -1; Py_XINCREF(client_properties); self->client_properties = client_properties; self->server_properties = NULL; return 0; } /* * Connection.fileno() */ static PyObject* PyRabbitMQ_Connection_fileno(PyRabbitMQ_Connection *self) { if (self->sockfd > 0) { return PyInt_FromLong((long)self->sockfd); } else { PyErr_SetString(PyExc_ValueError, "Socket not connected"); return 0; } } /* * Connection.connect() */ static PyObject* PyRabbitMQ_Connection_connect(PyRabbitMQ_Connection *self) { int status; amqp_socket_t *socket = NULL; amqp_rpc_reply_t reply; amqp_pool_t pool; amqp_table_t properties; if (self->connected) { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Already connected"); goto bail; } Py_BEGIN_ALLOW_THREADS; self->conn = amqp_new_connection(); socket = amqp_tcp_socket_new(self->conn); Py_END_ALLOW_THREADS; if (!socket) { PyErr_NoMemory(); goto error; } Py_BEGIN_ALLOW_THREADS; status = amqp_socket_open(socket, self->hostname, self->port); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQStatus(status, "Error opening socket")) { goto error; } Py_BEGIN_ALLOW_THREADS; self->sockfd = amqp_socket_get_sockfd(socket); if (self->client_properties != NULL && PyDict_Check(self->client_properties)) { init_amqp_pool(&pool, self->frame_max); properties = PyDict_ToAMQTable(self->conn, self->client_properties, &pool); reply = amqp_login_with_properties(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, &properties, AMQP_SASL_METHOD_PLAIN, self->userid, self->password); } else { reply = amqp_login(self->conn, self->virtual_host, self->channel_max, self->frame_max, self->heartbeat, AMQP_SASL_METHOD_PLAIN, self->userid, self->password); } Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't log in")) goto bail; /* after tune */ self->connected = 1; self->channel_max = self->conn->channel_max; self->frame_max = self->conn->frame_max; self->heartbeat = self->conn->heartbeat; self->server_properties = AMQTable_toPyDict(amqp_get_server_properties(self->conn)); Py_RETURN_NONE; error: PyRabbitMQ_Connection_close(self); bail: return 0; } /* * Connection._close() */ static PyObject* PyRabbitMQ_Connection_close(PyRabbitMQ_Connection *self) { amqp_rpc_reply_t reply; if (self->connected) { self->connected = 0; Py_BEGIN_ALLOW_THREADS reply = amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(self->conn); self->sockfd = 0; Py_END_ALLOW_THREADS } Py_RETURN_NONE; } unsigned int PyRabbitMQ_Connection_create_channel(PyRabbitMQ_Connection *self, unsigned int channel) { amqp_rpc_reply_t reply; Py_BEGIN_ALLOW_THREADS; amqp_channel_open(self->conn, channel); reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; return PyRabbitMQ_HandleAMQError(self, 0, reply, "Couldn't create channel"); } /* * Connection._channel_open */ static PyObject * PyRabbitMQ_Connection_channel_open(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "I", &channel)) goto bail; if (PyRabbitMQ_Connection_create_channel(self, channel)) goto bail;; Py_RETURN_NONE; bail: return 0; } /* * Connection._channel_close */ unsigned int PyRabbitMQ_Connection_destroy_channel(PyRabbitMQ_Connection *self, unsigned int channel) { amqp_rpc_reply_t reply; Py_BEGIN_ALLOW_THREADS; reply = amqp_channel_close(self->conn, channel, AMQP_REPLY_SUCCESS); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; return PyRabbitMQ_HandleAMQError(self, channel, reply, "Couldn't close channel"); } static PyObject* PyRabbitMQ_Connection_channel_close(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; if (PyRabbitMQ_Not_Connected(self)) goto error; if (!PyArg_ParseTuple(args, "I", &channel)) goto error; if (PyRabbitMQ_Connection_destroy_channel(self, channel)) goto error; Py_RETURN_NONE; error: return 0; } _PYRMQ_INLINE int PyRabbitMQ_ApplyCallback(PyRabbitMQ_Connection *self, PyObject *consumer_tag, PyObject *channel, PyObject *propdict, PyObject *delivery_info, PyObject *view) { int retval = 0; PyObject *channel_callbacks = NULL; PyObject *callback_for_tag = NULL; PyObject *channels = NULL; PyObject *channelobj = NULL; PyObject *Message = NULL; PyObject *message = NULL; PyObject *args = NULL; PyObject *callback_result = NULL; /* self.callbacks */ if (!(channel_callbacks = PyDict_GetItem(self->callbacks, channel))) return -1; /* self.callbacks[consumer_tag] */ if (!(callback_for_tag = PyDict_GetItem(channel_callbacks, consumer_tag))) goto error; /* self.channels */ if (!(channels = PyObject_GetAttrString((PyObject *)self, "channels"))) goto error; /* self.channels[channel] */ if (!(channelobj = PyDict_GetItem(channels, channel))) goto error; /* message = self.Message(channel, properties, delivery_info, body) */ Message = BUILD_METHOD_NAME("Message"); message = PyObject_CallMethodObjArgs((PyObject *)self, Message, channelobj, propdict, delivery_info, view, NULL); if (!message) goto error; /* callback(message) */ if ((args = PyTuple_New(1)) == NULL) { Py_DECREF(message); goto finally; } PyTuple_SET_ITEM(args, 0, message); callback_result = PyObject_CallObject(callback_for_tag, args); Py_XDECREF(callback_result); goto finally; error: retval = -1; finally: Py_XDECREF(args); Py_XDECREF(channels); Py_XDECREF(Message); return retval; } void PyRabbitMQ_SetErr_ReceivedFrame(PyRabbitMQ_Connection *self, amqp_frame_t* frame) { static char errstr[512]; switch(frame->payload.method.id) { case AMQP_CHANNEL_CLOSE_METHOD: { amqp_channel_close_t *chanm = (amqp_channel_close_t *)frame->payload.method.decoded; snprintf(errstr, sizeof(errstr), "channel error %d, message: %.*s", chanm->reply_code, (int) chanm->reply_text.len, (char *) chanm->reply_text.bytes); PyErr_SetString(PyRabbitMQExc_ChannelError, errstr); PyRabbitMQ_revive_channel(self, frame->channel); break; } case AMQP_CONNECTION_CLOSE_METHOD: { amqp_connection_close_t *connm = (amqp_connection_close_t *)frame->payload.method.decoded; snprintf(errstr, sizeof(errstr), "server connection error %d message: %.*s", connm->reply_code, (int) connm->reply_text.len, (char *) connm->reply_text.bytes); PyErr_SetString(PyRabbitMQExc_ConnectionError, errstr); PyRabbitMQ_Connection_close(self); break; } default: { PyErr_SetString(PyRabbitMQExc_ConnectionError, "Bad frame read"); break; } } } int PyRabbitMQ_recv(PyRabbitMQ_Connection *self, PyObject *p, amqp_connection_state_t conn, int piggyback) { amqp_frame_t frame; amqp_channel_t cur_channel = 0; amqp_basic_deliver_t *deliver; amqp_basic_properties_t *props; Py_ssize_t body_target; Py_ssize_t body_received; PyObject *channel = NULL; PyObject *consumer_tag = NULL; PyObject *delivery_info = NULL; PyObject *propdict = PyDict_New(); PyObject *payload = NULL; PyObject *view = NULL; char *buf = NULL; char *bufp = NULL; unsigned int i = 0; register unsigned int j = 0; int retval = 0; memset(&props, 0, sizeof(props)); while (1) { if (!piggyback) { Py_BEGIN_ALLOW_THREADS; amqp_maybe_release_buffers(conn); retval = amqp_simple_wait_frame(conn, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; if (frame.frame_type != AMQP_FRAME_METHOD) continue; if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) goto altframe; cur_channel = frame.channel; delivery_info = PyDict_New(); deliver = (amqp_basic_deliver_t *)frame.payload.method.decoded; /* need consumer tag for later. * add delivery info to the delivery_info dict. */ amqp_basic_deliver_to_PyDict(delivery_info, deliver->delivery_tag, deliver->exchange, deliver->routing_key, deliver->redelivered); /* add in the consumer_tag */ consumer_tag = PySTRING_FROM_AMQBYTES(deliver->consumer_tag); PyDict_SetItemString(delivery_info, "consumer_tag", consumer_tag); Py_XDECREF(consumer_tag); piggyback = 0; } Py_BEGIN_ALLOW_THREADS; retval = cur_channel == 0 ? amqp_simple_wait_frame(conn, &frame) : amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe; if (frame.frame_type != AMQP_FRAME_HEADER) { PyRabbitMQ_SetErr_UnexpectedHeader(&frame); goto finally; } /* if piggybacked, 'channel' is still 0 at this point */ cur_channel = frame.channel; /* channel */ channel = PyInt_FromLong((unsigned long)frame.channel); /* properties */ props = (amqp_basic_properties_t *)frame.payload.properties.decoded; basic_properties_to_PyDict(props, propdict); /* body */ body_target = frame.payload.properties.body_size; body_received = 0; for (i = 0; body_received < body_target; i++) { Py_BEGIN_ALLOW_THREADS; retval = amqp_simple_wait_frame_on_channel(conn, cur_channel, &frame); Py_END_ALLOW_THREADS; if (retval < 0) break; if (frame.frame_type == AMQP_FRAME_METHOD) goto altframe; if (frame.frame_type != AMQP_FRAME_BODY) { PyErr_SetString(PyRabbitMQExc_ChannelError, "Expected body, got unexpected frame"); goto finally; } bufp = frame.payload.body_fragment.bytes; body_received += frame.payload.body_fragment.len; if (!i) { if (body_received < body_target) { payload = PyBytes_FromStringAndSize(NULL, (Py_ssize_t)body_target); if (!payload) goto finally; buf = PyBytes_AsString(payload); if (!buf) goto finally; view = PyMemoryView_FromObject(payload); } else { if (p) { payload = PySTRING_FROM_AMQBYTES( frame.payload.body_fragment); } else { view = buffer_toMemoryView(bufp, (Py_ssize_t)frame.payload.body_fragment.len); } break; } } for (j = 0; j < frame.payload.body_fragment.len; *buf++ = *bufp++, j++); } if (p) { if (!payload) { /* expected content, got none */ if (body_target) goto error; /* did not expect content, return empty string */ payload = PyBytes_FromStringAndSize(NULL, 0); } PyDict_SetItemString(p, "properties", propdict); PyDict_SetItemString(p, "body", payload); PyDict_SetItemString(p, "channel", channel); retval = 0; } else { if (!view) goto error; retval = PyRabbitMQ_ApplyCallback(self, consumer_tag, channel, propdict, delivery_info, view); } break; } goto finally; altframe: PyRabbitMQ_SetErr_ReceivedFrame(self, &frame); error: retval = -1; finally: Py_XDECREF(payload); Py_XDECREF(channel); Py_XDECREF(propdict); Py_XDECREF(delivery_info); Py_XDECREF(view); return retval; } /* * Connection._queue_bind */ static PyObject* PyRabbitMQ_Connection_queue_bind(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel; PyObject *queue = NULL; PyObject *exchange = NULL; PyObject *routing_key = NULL; PyObject *arguments = NULL; amqp_table_t bargs = amqp_empty_table; amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOOO", &channel, &queue, &exchange, &routing_key, &arguments)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } bargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; amqp_queue_bind(self->conn, channel, PyString_AS_AMQBYTES(queue), PyString_AS_AMQBYTES(exchange), PyString_AS_AMQBYTES(routing_key), bargs); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.bind")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._queue_unbind */ static PyObject* PyRabbitMQ_Connection_queue_unbind(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel; PyObject *queue = NULL; PyObject *exchange = NULL; PyObject *routing_key = NULL; PyObject *arguments = NULL; amqp_table_t uargs = amqp_empty_table; amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOOO", &channel, &queue, &exchange, &routing_key, &arguments)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } uargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; amqp_queue_unbind(self->conn, channel, PyString_AS_AMQBYTES(queue), PyString_AS_AMQBYTES(exchange), PyString_AS_AMQBYTES(routing_key), uargs); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.unbind")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._queue_delete */ static PyObject* PyRabbitMQ_Connection_queue_delete(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *queue = NULL; unsigned int channel = 0; unsigned int if_unused = 0; unsigned int if_empty = 0; amqp_queue_delete_ok_t *ok; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOII", &channel, &queue, &if_unused, &if_empty)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_queue_delete(self->conn, channel, PyString_AS_AMQBYTES(queue), (amqp_boolean_t)if_unused, (amqp_boolean_t)if_empty); if (ok == NULL) reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (ok == NULL && PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.delete")) goto bail; return PyInt_FromLong((long)ok->message_count); bail: return 0; } /* * Connection._queue_declare */ static PyObject* PyRabbitMQ_Connection_queue_declare(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *queue = NULL; PyObject *arguments = NULL; unsigned int channel = 0; unsigned int passive = 0; unsigned int durable = 0; unsigned int exclusive = 0; unsigned int auto_delete = 0; amqp_queue_declare_ok_t *ok; amqp_rpc_reply_t reply; amqp_pool_t *channel_pool = NULL; amqp_table_t qargs = amqp_empty_table; PyObject *ret = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOIIIIO", &channel, &queue, &passive, &durable, &exclusive, &auto_delete, &arguments)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } qargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_queue_declare(self->conn, channel, PyString_AS_AMQBYTES(queue), (amqp_boolean_t)passive, (amqp_boolean_t)durable, (amqp_boolean_t)exclusive, (amqp_boolean_t)auto_delete, qargs ); reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.declare")) goto bail; if ((ret = PyTuple_New(3)) == NULL) goto bail; PyTuple_SET_ITEM(ret, 0, PySTRING_FROM_AMQBYTES(ok->queue)); PyTuple_SET_ITEM(ret, 1, PyInt_FromLong((long)ok->message_count)); PyTuple_SET_ITEM(ret, 2, PyInt_FromLong((long)ok->consumer_count)); return ret; bail: return 0; } /* * Connection._queue_purge */ static PyObject* PyRabbitMQ_Connection_queue_purge(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *queue = NULL; unsigned int channel = 0; unsigned int no_wait = 0; amqp_queue_purge_ok_t *ok; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_wait)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_queue_purge(self->conn, channel, PyString_AS_AMQBYTES(queue)); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "queue.purge")) goto bail; return PyInt_FromLong((long)ok->message_count); bail: return 0; } /* * Connection._exchange_declare */ static PyObject* PyRabbitMQ_Connection_exchange_declare(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; PyObject *exchange = NULL; PyObject *type = NULL; PyObject *arguments = 0; unsigned int passive = 0; unsigned int durable = 0; /* auto_delete argument is ignored, * as it has been decided that it's not that useful after all. */ unsigned int auto_delete = 0; amqp_table_t eargs = amqp_empty_table; amqp_pool_t *channel_pool = NULL; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOIIIO", &channel, &exchange, &type, &passive, &durable, &auto_delete, &arguments)) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((type = Maybe_Unicode(type)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } eargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; amqp_exchange_declare(self->conn, channel, PyString_AS_AMQBYTES(exchange), PyString_AS_AMQBYTES(type), (amqp_boolean_t)passive, (amqp_boolean_t)durable, 0, 0, eargs ); reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.declare")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._exchange_delete */ static PyObject* PyRabbitMQ_Connection_exchange_delete(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *exchange = NULL; unsigned int channel = 0; unsigned int if_unused = 0; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &exchange, &if_unused)) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; amqp_exchange_delete(self->conn, channel, PyString_AS_AMQBYTES(exchange), (amqp_boolean_t)if_unused); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "exchange.delete")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._basic_publish */ static PyObject* PyRabbitMQ_Connection_basic_publish(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *exchange = NULL; PyObject *routing_key = NULL; PyObject *propdict; unsigned int channel = 0; unsigned int mandatory = 0; unsigned int immediate = 0; char *body_buf = NULL; Py_ssize_t body_size = 0; int ret = 0; amqp_basic_properties_t props; amqp_bytes_t bytes; amqp_pool_t *channel_pool = NULL; memset(&props, 0, sizeof(props)); if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "Is#OOO|II", &channel, &body_buf, &body_size, &exchange, &routing_key, &propdict, &mandatory, &immediate)) goto bail; if ((exchange = Maybe_Unicode(exchange)) == NULL) goto bail; if ((routing_key = Maybe_Unicode(routing_key)) == NULL) goto bail; Py_INCREF(propdict); channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (PyDict_to_basic_properties(propdict, &props, self->conn, channel_pool) < 1) { goto bail; } Py_DECREF(propdict); bytes.len = (size_t)body_size; bytes.bytes = (void *)body_buf; Py_BEGIN_ALLOW_THREADS; ret = amqp_basic_publish(self->conn, channel, PyString_AS_AMQBYTES(exchange), PyString_AS_AMQBYTES(routing_key), (amqp_boolean_t)mandatory, (amqp_boolean_t)immediate, &props, bytes); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (!PyRabbitMQ_HandleError(ret, "basic.publish")) { goto error; } Py_RETURN_NONE; error: PyRabbitMQ_revive_channel(self, channel); bail: return 0; } /* * Connection._basic_ack */ static PyObject* PyRabbitMQ_Connection_basic_ack(PyRabbitMQ_Connection *self, PyObject *args) { Py_ssize_t delivery_tag = 0; unsigned int channel = 0; unsigned int multiple = 0; int ret = 0; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "InI", &channel, &delivery_tag, &multiple)) goto bail; Py_BEGIN_ALLOW_THREADS; ret = amqp_basic_ack(self->conn, channel, (uint64_t)delivery_tag, (amqp_boolean_t)multiple); Py_END_ALLOW_THREADS; if (!PyRabbitMQ_HandleError(ret, "basic.ack")) goto error; Py_RETURN_NONE; error: PyRabbitMQ_revive_channel(self, channel); bail: return 0; } /* * Connection._basic_reject */ static PyObject *PyRabbitMQ_Connection_basic_reject(PyRabbitMQ_Connection *self, PyObject *args) { Py_ssize_t delivery_tag = 0; unsigned int channel = 0; unsigned int multiple = 0; int ret = 0; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "InI", &channel, &delivery_tag, &multiple)) goto bail; Py_BEGIN_ALLOW_THREADS; ret = amqp_basic_reject(self->conn, channel, (uint64_t)delivery_tag, (amqp_boolean_t)multiple); Py_END_ALLOW_THREADS; if (!PyRabbitMQ_HandleError(ret, "basic.reject")) goto error; Py_RETURN_NONE; error: PyRabbitMQ_revive_channel(self, channel); bail: return 0; } /* * Connection._basic_cancel */ static PyObject* PyRabbitMQ_Connection_basic_cancel(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *consumer_tag = NULL; unsigned int channel = 0; amqp_basic_cancel_ok_t *ok; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto ok; if (!PyArg_ParseTuple(args, "IO", &channel, &consumer_tag)) goto bail; if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_basic_cancel(self->conn, channel, PyString_AS_AMQBYTES(consumer_tag)); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.cancel")) goto bail; ok: Py_RETURN_NONE; bail: return 0; } /* * Connection._basic_consume */ static PyObject* PyRabbitMQ_Connection_basic_consume(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *queue = NULL; PyObject *consumer_tag = NULL; PyObject *arguments = NULL; unsigned int channel = 0; unsigned int no_local = 0; unsigned int no_ack = 0; unsigned int exclusive = 0; amqp_basic_consume_ok_t *ok; amqp_rpc_reply_t reply; amqp_pool_t *channel_pool = NULL; amqp_table_t cargs = amqp_empty_table; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOOIIIO", &channel, &queue, &consumer_tag, &no_local, &no_ack, &exclusive, &arguments)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; if ((consumer_tag = Maybe_Unicode(consumer_tag)) == NULL) goto bail; channel_pool = amqp_get_or_create_channel_pool(self->conn, (amqp_channel_t)channel); if (channel_pool == NULL) { PyErr_NoMemory(); goto bail; } cargs = PyDict_ToAMQTable(self->conn, arguments, channel_pool); if (PyErr_Occurred()) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_basic_consume(self->conn, channel, PyString_AS_AMQBYTES(queue), PyString_AS_AMQBYTES(consumer_tag), no_local, no_ack, exclusive, cargs); reply = amqp_get_rpc_reply(self->conn); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.consume")) goto bail; return PySTRING_FROM_AMQBYTES(ok->consumer_tag); bail: return 0; } /* * Connection._basic_qos */ static PyObject* PyRabbitMQ_Connection_basic_qos(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; Py_ssize_t prefetch_size = 0; unsigned int prefetch_count = 0; unsigned int _global = 0; if (PyRabbitMQ_Not_Connected(self)) goto error; if (!PyArg_ParseTuple(args, "InII", &channel, &prefetch_size, &prefetch_count, &_global)) goto error; Py_BEGIN_ALLOW_THREADS; amqp_basic_qos(self->conn, channel, (uint32_t)prefetch_size, (uint16_t)prefetch_count, (amqp_boolean_t)_global); Py_END_ALLOW_THREADS; Py_RETURN_NONE; error: return 0; } /* * Connection._flow */ static PyObject* PyRabbitMQ_Connection_flow(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; unsigned int active = 1; amqp_channel_flow_ok_t *ok; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "II", &channel, &active)) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_channel_flow(self->conn, channel, (amqp_boolean_t)active); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "channel.flow")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._basic_recover */ static PyObject* PyRabbitMQ_Connection_basic_recover(PyRabbitMQ_Connection *self, PyObject *args) { unsigned int channel = 0; unsigned int requeue = 0; amqp_basic_recover_ok_t *ok; amqp_rpc_reply_t reply; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "II", &channel, &requeue)) goto bail; Py_BEGIN_ALLOW_THREADS; ok = amqp_basic_recover(self->conn, channel, (amqp_boolean_t)requeue); reply = amqp_get_rpc_reply(self->conn); amqp_maybe_release_buffers_on_channel(self->conn, channel); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.recover")) goto bail; Py_RETURN_NONE; bail: return 0; } /* * Connection._basic_recv */ static PyObject* PyRabbitMQ_Connection_basic_recv(PyRabbitMQ_Connection *self, PyObject *args) { int ready = 0; double timeout; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "d", &timeout)) goto bail; if (PYRMQ_SHOULD_POLL(timeout) && !AMQP_ACTIVE_BUFFERS(self->conn)) { Py_BEGIN_ALLOW_THREADS; ready = RabbitMQ_WAIT(self->sockfd, timeout); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandlePollError(ready) <= 0) goto bail; } if (PyRabbitMQ_recv(self, NULL, self->conn, 0) < 0) { if (!PyErr_Occurred()) PyErr_SetString(PyRabbitMQExc_ConnectionError, "Bad frame read"); goto error; } Py_RETURN_NONE; error: PyRabbitMQ_Connection_close(self); bail: return 0; } /* * Connection.__repr__ */ static PyObject * PyRabbitMQ_Connection_repr(PyRabbitMQ_Connection *self) { return FROM_FORMAT("", self->userid, self->password, self->hostname, self->port, self->virtual_host); } /* Connection._basic_get */ static PyObject* PyRabbitMQ_Connection_basic_get(PyRabbitMQ_Connection *self, PyObject *args) { PyObject *queue = NULL; unsigned int channel = 0; unsigned int no_ack = 0; amqp_rpc_reply_t reply; amqp_basic_get_ok_t *ok = NULL; PyObject *p = NULL; PyObject *delivery_info = NULL; PyObject *message_count = NULL; if (PyRabbitMQ_Not_Connected(self)) goto bail; if (!PyArg_ParseTuple(args, "IOI", &channel, &queue, &no_ack)) goto bail; if ((queue = Maybe_Unicode(queue)) == NULL) goto bail; Py_BEGIN_ALLOW_THREADS; reply = amqp_basic_get(self->conn, channel, PyString_AS_AMQBYTES(queue), (amqp_boolean_t)no_ack); Py_END_ALLOW_THREADS; if (PyRabbitMQ_HandleAMQError(self, channel, reply, "basic.get")) goto bail; if (reply.reply.id != AMQP_BASIC_GET_OK_METHOD) goto empty; ok = (amqp_basic_get_ok_t *)reply.reply.decoded; p = PyDict_New(); /* p["delivery_info"] = {} */ delivery_info = PyDict_New(); PyDICT_SETSTR_DECREF(p, "delivery_info", delivery_info); amqp_basic_deliver_to_PyDict(delivery_info, ok->delivery_tag, ok->exchange, ok->routing_key, ok->redelivered); /* add in the message_count */ message_count = PyLong_FromLong(ok->message_count); PyDict_SetItemString(delivery_info, "message_count", message_count); Py_XDECREF(message_count); if (amqp_data_in_buffer(self->conn)) { if (PyRabbitMQ_recv(self, p, self->conn, 1) < 0) { if (!PyErr_Occurred()) PyErr_SetString(PyRabbitMQExc_ConnectionError, "Bad frame read"); Py_XDECREF(p); Py_XDECREF(delivery_info); goto error; } } return p; error: PyRabbitMQ_Connection_close(self); bail: return 0; empty: Py_RETURN_NONE; } /* Module: _librabbitmq */ static PyMethodDef PyRabbitMQ_functions[] = { {NULL, NULL, 0, NULL} }; #if PY_MAJOR_VERSION >= 3 static struct PyModuleDef PyRabbitMQ_moduledef = { PyModuleDef_HEAD_INIT, PYRABBITMQ_MODULE_NAME, /* m_name */ PYRABBITMQ_MODULE_DESC, /* m_doc */ -1, /* m_size */ PyRabbitMQ_functions, /* m_methods */ NULL, /* m_reload */ NULL, /* m_traverse */ NULL, /* m_clear */ NULL, /* m_free */ }; #endif PYRABBITMQ_MOD_INIT(_librabbitmq) { PyObject *module, *socket_module; if (PyType_Ready(&PyRabbitMQ_ConnectionType) < 0) { #if PY_MAJOR_VERSION >= 3 return NULL; #else return; #endif } #if PY_MAJOR_VERSION >= 3 module = PyModule_Create(&PyRabbitMQ_moduledef); #else module = Py_InitModule3(PYRABBITMQ_MODULE_NAME, PyRabbitMQ_functions, PYRABBITMQ_MODULE_DESC); #endif if (module == NULL) { #if PY_MAJOR_VERSION >= 3 return NULL; #else return; #endif } /* Get socket.error */ socket_module = PyImport_ImportModule("socket"); if (!socket_module) { #if PY_MAJOR_VERSION >= 3 return NULL; #else return; #endif } PyRabbitMQ_socket_timeout = PyObject_GetAttrString(socket_module, "timeout"); Py_XDECREF(socket_module); PyModule_AddStringConstant(module, "__version__", PYRABBITMQ_VERSION); PyModule_AddStringConstant(module, "__author__", PYRABBITMQ_AUTHOR); PyModule_AddStringConstant(module, "__contact__", PYRABBITMQ_CONTACT); PyModule_AddStringConstant(module, "__homepage__", PYRABBITMQ_HOMEPAGE); Py_INCREF(&PyRabbitMQ_ConnectionType); PyModule_AddObject(module, "Connection", (PyObject *)&PyRabbitMQ_ConnectionType); PyModule_AddIntConstant(module, "AMQP_SASL_METHOD_PLAIN", AMQP_SASL_METHOD_PLAIN); PyRabbitMQExc_ConnectionError = PyErr_NewException( "_librabbitmq.ConnectionError", NULL, NULL); PyModule_AddObject(module, "ConnectionError", (PyObject *)PyRabbitMQExc_ConnectionError); PyRabbitMQExc_ChannelError = PyErr_NewException( "_librabbitmq.ChannelError", NULL, NULL); PyModule_AddObject(module, "ChannelError", (PyObject *)PyRabbitMQExc_ChannelError); #if PY_MAJOR_VERSION >= 3 return module; #else return; #endif }