summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-04-17 11:24:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-04-17 11:24:42 +0100
commit1b15768f84d430e0f5363ff07a3f64009144cea3 (patch)
treed574c26110287e18c1210ce77e7eb0281a634948
parentc6e332a5723ee6aececdf2ef67398ccc26b32362 (diff)
downloadrabbitmq-server-bug26127.tar.gz
Remove rabbit_amqqueue:force_event_refresh/1 synchronybug26127
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl38
2 files changed, 22 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d38f8191..1aba7ecb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -45,8 +45,6 @@
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
--define(FAILOVER_WAIT_MILLIS, 100).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -517,26 +515,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-%% We need to account for the idea that queues may be mid-promotion
-%% during force_event_refresh (since it's likely we're doing this in
-%% the first place since a node failed). Therefore we keep poking at
-%% the list of queues until we were able to talk to a live process or
-%% the queue no longer exists.
force_event_refresh(Ref) ->
- force_event_refresh([Q#amqqueue.name || Q <- list()], Ref).
-
-force_event_refresh(QNames, Ref) ->
- Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- {_, Bad} = gen_server2:mcall(
- [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]),
- FailedPids = [Pid || {Pid, _Reason} <- Bad],
- Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
- lists:member(Pid, FailedPids)],
- case Failed of
- [] -> ok;
- _ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
- force_event_refresh(Failed, Ref)
- end.
+ [gen_server2:cast(Q#amqqueue.pid,
+ {force_event_refresh, Ref}) || Q <- list()],
+ ok.
notify_policy_changed(#amqqueue{pid = QPid}) ->
gen_server2:cast(QPid, policy_changed).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5d3f3a12..9b785303 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1062,25 +1062,7 @@ handle_call(sync_mirrors, _From, State) ->
%% By definition if we get this message here we do not have to do anything.
handle_call(cancel_sync_mirrors, _From, State) ->
- reply({ok, not_syncing}, State);
-
-handle_call({force_event_refresh, Ref}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
- QName = qname(State),
- AllConsumers = rabbit_queue_consumers:all(Consumers),
- case Exclusive of
- none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Prefetch,
- Args, Ref) ||
- {Ch, CTag, AckRequired, Prefetch, Args}
- <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
- emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
- end,
- reply(ok, State).
+ reply({ok, not_syncing}, State).
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1167,6 +1149,24 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
run_message_queue(true, State1)
end);
+handle_cast({force_event_refresh, Ref},
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
+ QName = qname(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
+ case Exclusive of
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref) ||
+ {Ch, CTag, AckRequired, Prefetch, Args}
+ <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
+ end,
+ noreply(State);
+
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);