summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-07 00:40:43 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-07 00:40:43 +0100
commita32fbd1ba39ab1ba9d1e798b1065b59f7f755267 (patch)
treec850e708249c3305ecdaa5a736ee9fa705a63453
parentdc04434f7398528ef69954f0f840ac9ce5847347 (diff)
downloadrabbitmq-c-github-ask-a32fbd1ba39ab1ba9d1e798b1065b59f7f755267.tar.gz
Introduce more mid-level API
-rw-r--r--examples/amqp_consumer.c53
-rw-r--r--examples/amqp_exchange_declare.c20
-rw-r--r--examples/amqp_listen.c53
-rw-r--r--librabbitmq/amqp.h48
-rw-r--r--librabbitmq/amqp_api.c98
-rw-r--r--librabbitmq/codegen.py3
6 files changed, 143 insertions, 132 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 84c3669..3b347f5 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -88,7 +88,6 @@ int main(int argc, char const * const *argv) {
int sockfd;
amqp_connection_state_t conn;
- amqp_rpc_reply_t result;
amqp_bytes_t queuename;
if (argc < 3) {
@@ -109,57 +108,21 @@ int main(int argc, char const * const *argv) {
"Logging in");
{
- amqp_queue_declare_t s =
- (amqp_queue_declare_t) {
- .ticket = 0,
- .queue = {.len = 0, .bytes = NULL},
- .passive = 0,
- .durable = 0,
- .exclusive = 0,
- .auto_delete = 1,
- .nowait = 0,
- .arguments = {.num_entries = 0, .entries = NULL}
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_QUEUE_DECLARE_METHOD,
- AMQP_QUEUE_DECLARE_OK_METHOD, &s),
- "Declaring queue");
- amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t *) result.reply.decoded;
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
+ AMQP_EMPTY_TABLE);
+ die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
die_on_error(-ENOMEM, "Copying queue name");
}
}
- {
- amqp_queue_bind_t s =
- (amqp_queue_bind_t) {
- .ticket = 0,
- .queue = queuename,
- .exchange = amqp_cstring_bytes(exchange),
- .routing_key = amqp_cstring_bytes(bindingkey),
- .nowait = 0,
- .arguments = {.num_entries = 0, .entries = NULL}
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_QUEUE_BIND_METHOD,
- AMQP_QUEUE_BIND_OK_METHOD, &s),
- "Binding queue");
- }
+ amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
+ AMQP_EMPTY_TABLE);
+ die_on_amqp_error(amqp_rpc_reply, "Binding queue");
- {
- amqp_basic_consume_t s =
- (amqp_basic_consume_t) {
- .ticket = 0,
- .queue = queuename,
- .consumer_tag = {.len = 0, .bytes = NULL},
- .no_local = 0,
- .no_ack = 1,
- .exclusive = 0,
- .nowait = 0
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_BASIC_CONSUME_METHOD,
- AMQP_BASIC_CONSUME_OK_METHOD, &s),
- "Consuming");
- }
+ amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
+ die_on_amqp_error(amqp_rpc_reply, "Consuming");
run(conn);
diff --git a/examples/amqp_exchange_declare.c b/examples/amqp_exchange_declare.c
index d03290e..f163db1 100644
--- a/examples/amqp_exchange_declare.c
+++ b/examples/amqp_exchange_declare.c
@@ -36,23 +36,9 @@ int main(int argc, char const * const *argv) {
die_on_amqp_error(amqp_login(conn, "/", 131072, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
- {
- amqp_exchange_declare_t s =
- (amqp_exchange_declare_t) {
- .ticket = 0,
- .exchange = amqp_cstring_bytes(exchange),
- .type = amqp_cstring_bytes(exchangetype),
- .passive = 0,
- .durable = 0,
- .auto_delete = 0,
- .internal = 0,
- .nowait = 0,
- .arguments = {.num_entries = 0, .entries = NULL}
- };
- die_on_amqp_error(amqp_simple_rpc(conn, 1, AMQP_EXCHANGE_DECLARE_METHOD,
- AMQP_EXCHANGE_DECLARE_OK_METHOD, &s),
- "Declaring exchange");
- }
+ amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange), amqp_cstring_bytes(exchangetype),
+ 0, 0, 0, AMQP_EMPTY_TABLE);
+ die_on_amqp_error(amqp_rpc_reply, "Declaring exchange");
die_on_amqp_error(amqp_channel_close(conn, AMQP_REPLY_SUCCESS), "Closing channel");
die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index f759183..3bbb341 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -24,7 +24,6 @@ int main(int argc, char const * const *argv) {
int sockfd;
amqp_connection_state_t conn;
- amqp_rpc_reply_t result;
amqp_bytes_t queuename;
if (argc < 5) {
@@ -45,57 +44,21 @@ int main(int argc, char const * const *argv) {
"Logging in");
{
- amqp_queue_declare_t s =
- (amqp_queue_declare_t) {
- .ticket = 0,
- .queue = {.len = 0, .bytes = NULL},
- .passive = 0,
- .durable = 0,
- .exclusive = 0,
- .auto_delete = 1,
- .nowait = 0,
- .arguments = {.num_entries = 0, .entries = NULL}
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_QUEUE_DECLARE_METHOD,
- AMQP_QUEUE_DECLARE_OK_METHOD, &s),
- "Declaring queue");
- amqp_queue_declare_ok_t *r = (amqp_queue_declare_ok_t *) result.reply.decoded;
+ amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, AMQP_EMPTY_BYTES, 0, 0, 0, 1,
+ AMQP_EMPTY_TABLE);
+ die_on_amqp_error(amqp_rpc_reply, "Declaring queue");
queuename = amqp_bytes_malloc_dup(r->queue);
if (queuename.bytes == NULL) {
die_on_error(-ENOMEM, "Copying queue name");
}
}
- {
- amqp_queue_bind_t s =
- (amqp_queue_bind_t) {
- .ticket = 0,
- .queue = queuename,
- .exchange = amqp_cstring_bytes(exchange),
- .routing_key = amqp_cstring_bytes(bindingkey),
- .nowait = 0,
- .arguments = {.num_entries = 0, .entries = NULL}
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_QUEUE_BIND_METHOD,
- AMQP_QUEUE_BIND_OK_METHOD, &s),
- "Binding queue");
- }
+ amqp_queue_bind(conn, 1, queuename, amqp_cstring_bytes(exchange), amqp_cstring_bytes(bindingkey),
+ AMQP_EMPTY_TABLE);
+ die_on_amqp_error(amqp_rpc_reply, "Binding queue");
- {
- amqp_basic_consume_t s =
- (amqp_basic_consume_t) {
- .ticket = 0,
- .queue = queuename,
- .consumer_tag = {.len = 0, .bytes = NULL},
- .no_local = 0,
- .no_ack = 1,
- .exclusive = 0,
- .nowait = 0
- };
- die_on_amqp_error(result = amqp_simple_rpc(conn, 1, AMQP_BASIC_CONSUME_METHOD,
- AMQP_BASIC_CONSUME_OK_METHOD, &s),
- "Consuming");
- }
+ amqp_basic_consume(conn, 1, queuename, AMQP_EMPTY_BYTES, 0, 1, 0);
+ die_on_amqp_error(amqp_rpc_reply, "Consuming");
{
amqp_frame_t frame;
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index f27af7f..7210ec6 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -15,6 +15,8 @@ typedef struct amqp_bytes_t_ {
void *bytes;
} amqp_bytes_t;
+#define AMQP_EMPTY_BYTES ((amqp_bytes_t) { .len = 0, .bytes = NULL })
+
typedef struct amqp_decimal_t_ {
int decimals;
uint32_t value;
@@ -27,6 +29,8 @@ typedef struct amqp_table_t_ {
struct amqp_table_entry_t_ *entries;
} amqp_table_t;
+#define AMQP_EMPTY_TABLE ((amqp_table_t) { .num_entries = 0, .entries = NULL })
+
typedef struct amqp_table_entry_t_ {
amqp_bytes_t key;
char kind;
@@ -159,6 +163,15 @@ extern amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state,
amqp_method_number_t expected_reply_id,
void *decoded_request_method);
+#define AMQP_SIMPLE_RPC(state, channel, classname, requestname, replyname, structname, ...) \
+ ({ \
+ structname _simple_rpc_request___ = (structname) { __VA_ARGS__ }; \
+ amqp_simple_rpc(state, channel, \
+ AMQP_ ## classname ## _ ## requestname ## _METHOD, \
+ AMQP_ ## classname ## _ ## replyname ## _METHOD, \
+ &_simple_rpc_request___); \
+ })
+
extern amqp_rpc_reply_t amqp_login(amqp_connection_state_t state,
char const *vhost,
int frame_max,
@@ -176,6 +189,41 @@ extern int amqp_basic_publish(amqp_connection_state_t state,
extern amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code);
extern amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code);
+extern amqp_rpc_reply_t amqp_rpc_reply;
+
+extern struct amqp_exchange_declare_ok_t_ *amqp_exchange_declare(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t type,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_boolean_t auto_delete,
+ amqp_table_t arguments);
+
+extern struct amqp_queue_declare_ok_t_ *amqp_queue_declare(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_boolean_t exclusive,
+ amqp_boolean_t auto_delete,
+ amqp_table_t arguments);
+
+extern struct amqp_queue_bind_ok_t_ *amqp_queue_bind(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_table_t arguments);
+
+extern struct amqp_basic_consume_ok_t_ *amqp_basic_consume(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t consumer_tag,
+ amqp_boolean_t no_local,
+ amqp_boolean_t no_ack,
+ amqp_boolean_t exclusive);
+
#ifdef __cplusplus
}
#endif
diff --git a/librabbitmq/amqp_api.c b/librabbitmq/amqp_api.c
index fc8b56c..064436f 100644
--- a/librabbitmq/amqp_api.c
+++ b/librabbitmq/amqp_api.c
@@ -71,31 +71,81 @@ int amqp_basic_publish(amqp_connection_state_t state,
}
amqp_rpc_reply_t amqp_channel_close(amqp_connection_state_t state, int code) {
- amqp_channel_close_t s =
- (amqp_channel_close_t) {
- .reply_code = code,
- .reply_text = {.len = 0, .bytes = NULL},
- .class_id = 0,
- .method_id = 0
- };
- return amqp_simple_rpc(state,
- 1,
- AMQP_CHANNEL_CLOSE_METHOD,
- AMQP_CHANNEL_CLOSE_OK_METHOD,
- &s);
+ return AMQP_SIMPLE_RPC(state, 1, CHANNEL, CLOSE, CLOSE_OK,
+ amqp_channel_close_t,
+ code, {0,NULL}, 0, 0);
}
amqp_rpc_reply_t amqp_connection_close(amqp_connection_state_t state, int code) {
- amqp_connection_close_t s =
- (amqp_connection_close_t) {
- .reply_code = code,
- .reply_text = {.len = 0, .bytes = NULL},
- .class_id = 0,
- .method_id = 0
- };
- return amqp_simple_rpc(state,
- 0,
- AMQP_CONNECTION_CLOSE_METHOD,
- AMQP_CONNECTION_CLOSE_OK_METHOD,
- &s);
+ return AMQP_SIMPLE_RPC(state, 0, CONNECTION, CLOSE, CLOSE_OK,
+ amqp_connection_close_t,
+ code, {0,NULL}, 0, 0);
+}
+
+amqp_rpc_reply_t amqp_rpc_reply;
+
+#define RPC_REPLY(replytype) \
+ (amqp_rpc_reply.reply_type == AMQP_RESPONSE_NORMAL \
+ ? (replytype *) amqp_rpc_reply.reply.decoded \
+ : NULL)
+
+amqp_exchange_declare_ok_t *amqp_exchange_declare(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t exchange,
+ amqp_bytes_t type,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_boolean_t auto_delete,
+ amqp_table_t arguments)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, EXCHANGE, DECLARE, DECLARE_OK,
+ amqp_exchange_declare_t,
+ 0, exchange, type, passive, durable, auto_delete, 0, 0, arguments);
+ return RPC_REPLY(amqp_exchange_declare_ok_t);
+}
+
+amqp_queue_declare_ok_t *amqp_queue_declare(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_boolean_t passive,
+ amqp_boolean_t durable,
+ amqp_boolean_t exclusive,
+ amqp_boolean_t auto_delete,
+ amqp_table_t arguments)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, QUEUE, DECLARE, DECLARE_OK,
+ amqp_queue_declare_t,
+ 0, queue, passive, durable, exclusive, auto_delete, 0, arguments);
+ return RPC_REPLY(amqp_queue_declare_ok_t);
+}
+
+amqp_queue_bind_ok_t *amqp_queue_bind(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t exchange,
+ amqp_bytes_t routing_key,
+ amqp_table_t arguments)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, QUEUE, BIND, BIND_OK,
+ amqp_queue_bind_t,
+ 0, queue, exchange, routing_key, 0, arguments);
+ return RPC_REPLY(amqp_queue_bind_ok_t);
+}
+
+amqp_basic_consume_ok_t *amqp_basic_consume(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_bytes_t queue,
+ amqp_bytes_t consumer_tag,
+ amqp_boolean_t no_local,
+ amqp_boolean_t no_ack,
+ amqp_boolean_t exclusive)
+{
+ amqp_rpc_reply =
+ AMQP_SIMPLE_RPC(state, channel, BASIC, CONSUME, CONSUME_OK,
+ amqp_basic_consume_t,
+ 0, queue, consumer_tag, no_local, no_ack, exclusive, 0);
+ return RPC_REPLY(amqp_basic_consume_ok_t);
}
diff --git a/librabbitmq/codegen.py b/librabbitmq/codegen.py
index 0967119..30cb79c 100644
--- a/librabbitmq/codegen.py
+++ b/librabbitmq/codegen.py
@@ -406,7 +406,8 @@ extern int amqp_encode_properties(uint16_t class_id,
m.klass.index,
m.index,
methodid)
- print "typedef struct {\n%s} %s;\n" % (fieldDeclList(m.arguments), m.structName())
+ print "typedef struct %s_ {\n%s} %s;\n" % \
+ (m.structName(), fieldDeclList(m.arguments), m.structName())
print "/* Class property records. */"
for c in spec.allClasses():