summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-18 17:44:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-18 17:44:01 +0000
commitfef08832f626b266b1bf1472e1c2603c0b3a7afd (patch)
treefccae6550c361c2562f49543e54792b4dc68219d
parentec5086953c6b6739255041b9d30d420257f2ceba (diff)
downloadrabbitmq-server-bug26030.tar.gz
First hack at recovering consumers.bug26030
-rw-r--r--src/rabbit_channel.erl148
1 files changed, 88 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 56a3cbb6..600488cd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -755,9 +755,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_prefetch = ConsumerPrefetchCount,
+ _, State = #ch{consumer_prefetch = ConsumerPrefetch,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -769,38 +767,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
"amq.ctag");
Other -> Other
end,
-
- %% We get the queue process to send the consume_ok on our
- %% behalf. This is for symmetry with basic.cancel - see
- %% the comment in that method for why.
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) ->
- {rabbit_amqqueue:basic_consume(
- Q, NoAck, self(),
- rabbit_limiter:pid(Limiter),
- rabbit_limiter:is_active(Limiter),
- ConsumerPrefetchCount,
- ActualConsumerTag, ExclusiveConsume, Args,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag})),
- Q}
- end) of
- {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
- CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
- State1 = monitor_delivering_queue(
- NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1}),
- {noreply,
- case NoWait of
- true -> consumer_monitor(ActualConsumerTag, State1);
- false -> State1
- end};
- {{error, exclusive_consume_unavailable}, _Q} ->
- rabbit_misc:protocol_error(
- access_refused, "~s in exclusive use",
- [rabbit_misc:rs(QueueName)])
- end;
+ {noreply, basic_consume(
+ QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait, State)};
{ok, _} ->
%% Attempted reuse of consumer tag.
rabbit_misc:protocol_error(
@@ -1174,10 +1143,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
drain = Drain},
_, State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
- {ok, Q} -> ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain),
- {noreply, State};
- error -> precondition_failed("unknown consumer tag '~s'", [CTag])
+ {ok, {Q, _Args}} -> ok = rabbit_amqqueue:credit(
+ Q, self(), CTag, Credit, Drain),
+ {noreply, State};
+ error -> precondition_failed(
+ "unknown consumer tag '~s'", [CTag])
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1186,6 +1156,45 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+%% We get the queue process to send the consume_ok on our behalf. This
+%% is for symmetry with basic.cancel - see the comment in that method
+%% for why.
+basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait,
+ State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_mapping = ConsumerMapping}) ->
+ case rabbit_amqqueue:with_exclusive_access_or_die(
+ QueueName, ConnPid,
+ fun (Q) ->
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
+ end) of
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
+ CM1 = dict:store(
+ ActualConsumerTag,
+ {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
+ ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
+ case NoWait of
+ true -> consumer_monitor(ActualConsumerTag, State1);
+ false -> State1
+ end;
+ {{error, exclusive_consume_unavailable}, _Q} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s in exclusive use",
+ [rabbit_misc:rs(QueueName)])
+ end.
+
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
queue_monitors = QMons,
@@ -1194,7 +1203,8 @@ consumer_monitor(ConsumerTag,
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
- #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
+ {#amqqueue{pid = QPid}, _Args} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
QCons1 = dict:update(QPid,
fun (CTags) ->
gb_sets:insert(ConsumerTag, CTags)
@@ -1231,28 +1241,46 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
end.
-handle_consuming_queue_down(QPid,
- State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons,
- queue_names = QNames}) ->
+handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
+ queue_names = QNames}) ->
ConsumerTags = case dict:find(QPid, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
- ConsumerMapping1 =
- gb_sets:fold(fun (CTag, CMap) ->
- ok = send(#'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- State),
- rabbit_event:notify(
- consumer_deleted,
- [{consumer_tag, CTag},
- {channel, self()},
- {queue, dict:fetch(QPid, QNames)}]),
- dict:erase(CTag, CMap)
- end, ConsumerMapping, ConsumerTags),
- State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = dict:erase(QPid, QCons)}.
+ gb_sets:fold(
+ fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
+ QName = dict:fetch(QPid, QNames),
+ case queue_down_consumer_action(QPid, QName, CTag, CMap) of
+ remove ->
+ ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true},
+ State),
+ rabbit_event:notify(
+ consumer_deleted, [{consumer_tag, CTag},
+ {channel, self()},
+ {queue, QName}]),
+ StateN#ch{consumer_mapping = dict:erase(CTag, CMap)};
+ {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
+ basic_consume(
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN)
+ end
+ end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
+
+queue_down_consumer_action(QPid, QName, CTag, CMap) ->
+ {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
+ case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of
+ {bool, true} ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{pid = QPid}} -> timer:sleep(25),
+ queue_down_consumer_action(
+ QPid, QName, CTag, CMap);
+ {ok, _Q} -> {recover, ConsumeSpec};
+ {error, not_found} -> remove
+ end;
+ _ ->
+ remove
+ end.
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
@@ -1427,8 +1455,8 @@ foreach_per_queue(F, UAL) ->
rabbit_misc:gb_trees_foreach(F, T).
consumer_queues(Consumers) ->
- lists:usort([QPid ||
- {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]).
+ lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _Args}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for