diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 11:24:42 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-04-17 11:24:42 +0100 |
commit | 1b15768f84d430e0f5363ff07a3f64009144cea3 (patch) | |
tree | d574c26110287e18c1210ce77e7eb0281a634948 | |
parent | c6e332a5723ee6aececdf2ef67398ccc26b32362 (diff) | |
download | rabbitmq-server-bug26127.tar.gz |
Remove rabbit_amqqueue:force_event_refresh/1 synchronybug26127
-rw-r--r-- | src/rabbit_amqqueue.erl | 24 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 38 |
2 files changed, 22 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d38f8191..1aba7ecb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -45,8 +45,6 @@ -define(MORE_CONSUMER_CREDIT_AFTER, 50). --define(FAILOVER_WAIT_MILLIS, 100). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -517,26 +515,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -%% We need to account for the idea that queues may be mid-promotion -%% during force_event_refresh (since it's likely we're doing this in -%% the first place since a node failed). Therefore we keep poking at -%% the list of queues until we were able to talk to a live process or -%% the queue no longer exists. force_event_refresh(Ref) -> - force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). - -force_event_refresh(QNames, Ref) -> - Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = gen_server2:mcall( - [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), - FailedPids = [Pid || {Pid, _Reason} <- Bad], - Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, - lists:member(Pid, FailedPids)], - case Failed of - [] -> ok; - _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed, Ref) - end. + [gen_server2:cast(Q#amqqueue.pid, + {force_event_refresh, Ref}) || Q <- list()], + ok. notify_policy_changed(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, policy_changed). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5d3f3a12..9b785303 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1062,25 +1062,7 @@ handle_call(sync_mirrors, _From, State) -> %% By definition if we get this message here we do not have to do anything. handle_call(cancel_sync_mirrors, _From, State) -> - reply({ok, not_syncing}, State); - -handle_call({force_event_refresh, Ref}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), - QName = qname(State), - AllConsumers = rabbit_queue_consumers:all(Consumers), - case Exclusive of - none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Prefetch, - Args, Ref) || - {Ch, CTag, AckRequired, Prefetch, Args} - <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, - emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) - end, - reply(ok, State). + reply({ok, not_syncing}, State). handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1167,6 +1149,24 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, run_message_queue(true, State1) end); +handle_cast({force_event_refresh, Ref}, + State = #q{consumers = Consumers, + exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), + QName = qname(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), + case Exclusive of + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, + emit_consumer_created( + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) + end, + noreply(State); + handle_cast(notify_decorators, State) -> notify_decorators(State), noreply(State); |