summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-21 17:05:03 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-21 17:05:03 +0100
commit5a38b5e837c68ea8c86acd3edc2bc5b932271b5f (patch)
tree5c767a2f99b7bdae4d04e907b390a953566280fc
parent968d65b72c0734b7552aad7225e33646847786c6 (diff)
downloadrabbitmq-server-bug25725.tar.gz
Add consumer arguments to events and rabbitmqctl.bug25725
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl23
3 files changed, 19 insertions, 20 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index b2361cde..d7c93924 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1618,14 +1618,10 @@
characters, the name of the queue subscribed to, the id of
the channel process via which the subscription was created
and is managed, the consumer tag which uniquely identifies
- the subscription within a channel, and a boolean
+ the subscription within a channel, a boolean
indicating whether acknowledgements are expected for
- messages delivered to this consumer.
- </para>
- <para>
- The output is a list of rows containing, in order, the queue name,
- channel process id, consumer tag, and a boolean indicating whether
- acknowledgements are expected from the consumer.
+ messages delivered to this consumer, and any arguments for this
+ consumer.
</para>
</listitem>
</varlistentry>
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 10f97afd..0673ff8e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -184,7 +184,7 @@
%%----------------------------------------------------------------------------
-define(CONSUMER_INFO_KEYS,
- [queue_name, channel_pid, consumer_tag, ack_required]).
+ [queue_name, channel_pid, consumer_tag, ack_required, arguments]).
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
@@ -512,8 +512,8 @@ consumers_all(VHostPath) ->
map(VHostPath,
fun (Q) ->
[lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, ConsumerTag, AckRequired]) ||
- {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) ||
+ {ChPid, CTag, AckRequired, Args} <- consumers(Q)]
end)).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 547efa45..972e6be0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1012,8 +1012,9 @@ consumers(#q{active_consumers = ActiveConsumers}) ->
consumers(Consumers, Acc) ->
priority_queue:fold(
- fun ({ChPid, #consumer{tag = CTag, ack_required = AckReq}}, _P, Acc1) ->
- [{ChPid, CTag, AckReq} | Acc1]
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Args} | Acc1]
end, Acc, Consumers).
emit_stats(State) ->
@@ -1022,13 +1023,14 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
- [{consumer_tag, ConsumerTag},
+ [{consumer_tag, CTag},
{exclusive, Exclusive},
{ack_required, AckRequired},
{channel, ChPid},
- {queue, QName}]).
+ {queue, QName},
+ {arguments, Args}]).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
rabbit_event:notify(consumer_deleted,
@@ -1176,7 +1178,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1)),
+ not NoAck, qname(State1), OtherArgs),
AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers),
State2 = State1#q{active_consumers = AC1},
reply(ok, run_message_queue(State2))
@@ -1275,10 +1277,11 @@ handle_call(force_event_refresh, _From,
QName = qname(State),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName) ||
- {Ch, CTag, AckRequired} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired, QName)
+ Ch, CTag, false, AckRequired, QName, Args) ||
+ {Ch, CTag, AckRequired, Args} <- consumers(State)];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State),
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Args)
end,
reply(ok, State).