diff options
-rw-r--r-- | packaging/RPMS/Fedora/Makefile | 3 | ||||
-rw-r--r-- | packaging/common/rabbitmq-server.init | 5 | ||||
-rw-r--r-- | packaging/debs/Debian/Makefile | 1 | ||||
-rw-r--r-- | packaging/windows/Makefile | 5 | ||||
-rw-r--r-- | src/credit_flow.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 89 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 13 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 28 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
-rw-r--r-- | src/rabbit_net.erl | 7 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 16 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 50 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 13 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
-rw-r--r-- | src/rabbit_upgrade.erl | 12 | ||||
-rw-r--r-- | src/rabbit_version.erl | 5 | ||||
-rw-r--r-- | src/supervisor2.erl | 63 | ||||
-rw-r--r-- | src/tcp_acceptor.erl | 8 |
21 files changed, 255 insertions, 127 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c67d8fd6..234fc2c7 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -15,9 +15,11 @@ endif ifeq "$(RPM_OS)" "suse" REQUIRES=/sbin/chkconfig /sbin/service OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse' +START_PROG=setsid else REQUIRES=chkconfig initscripts OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' +START_PROG=runuser rabbitmq --session-command endif rpms: clean server @@ -32,6 +34,7 @@ prepare: cp ${COMMON_DIR}/* SOURCES/ sed -i \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \ SOURCES/rabbitmq-server.init ifeq "$(RPM_OS)" "fedora" # Fedora says that only vital services should have Default-Start diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index 4084d8c7..c942f8e3 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -26,7 +26,8 @@ ROTATE_SUFFIX= INIT_LOG_DIR=/var/log/rabbitmq PID_FILE=/var/run/rabbitmq/pid -LOCK_FILE= # This is filled in when building packages +START_PROG= # Set when building package +LOCK_FILE= # Set when building package test -x $DAEMON || exit 0 test -x $CONTROL || exit 0 @@ -56,7 +57,7 @@ start_rabbitmq () { RETVAL=0 ensure_pid_dir set +e - RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \ + RABBITMQ_PID_FILE=$PID_FILE $START_PROG $DAEMON \ > "${INIT_LOG_DIR}/startup_log" \ 2> "${INIT_LOG_DIR}/startup_err" \ 0<&- & diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 79e9c1dd..2a738f6e 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -26,6 +26,7 @@ package: clean # runlevel 2 should start network services. sed -i \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \ -e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \ -e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \ $(UNPACKED_DIR)/debian/rabbitmq-server.init diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index a910941b..1c222162 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -8,10 +8,7 @@ dist: $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-plugins.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/*.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile $(SOURCE_DIR)/*mk rm -f $(SOURCE_DIR)/README diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 072f4d9d..ba99811f 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -96,7 +96,8 @@ peer_down(Peer) -> %% credit_deferred and thus send messages into the void... unblock(Peer), erase({credit_from, Peer}), - erase({credit_to, Peer}). + erase({credit_to, Peer}), + ok. %% -------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 48236ca5..c2724a12 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -42,6 +42,8 @@ -define(MORE_CONSUMER_CREDIT_AFTER, 50). +-define(FAILOVER_WAIT_MILLIS, 100). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -421,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). +%% We need to account for the idea that queues may be mid-promotion +%% during force_event_refresh (since it's likely we're doing this in +%% the first place since a node failed). Therefore we keep poking at +%% the list of queues until we were able to talk to a live process or +%% the queue no longer exists. force_event_refresh() -> - [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()], - ok. + force_event_refresh([Q#amqqueue.name || Q <- list()]). + +force_event_refresh(QNames) -> + Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], + {_, Bad} = rabbit_misc:multi_call( + [Q#amqqueue.pid || Q <- Qs], force_event_refresh), + FailedPids = [Pid || {Pid, _Reason} <- Bad], + Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, + lists:member(Pid, FailedPids)], + case Failed of + [] -> ok; + _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), + force_event_refresh(Failed) + end. consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ee547138..e1fd9bbc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -24,9 +24,6 @@ -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, - #message_properties{expiry = undefined, needs_confirming = false}). - -export([start_link/1, info_keys/0]). -export([init_with_backing_queue_state/7]). @@ -330,12 +327,10 @@ ensure_expiry_timer(State = #q{expires = undefined}) -> State; ensure_expiry_timer(State = #q{expires = Expires}) -> case is_unused(State) of - true -> - NewState = stop_expiry_timer(State), - TRef = erlang:send_after(Expires, self(), maybe_expire), - NewState#q{expiry_timer_ref = TRef}; - false -> - State + true -> NewState = stop_expiry_timer(State), + TRef = erlang:send_after(Expires, self(), maybe_expire), + NewState#q{expiry_timer_ref = TRef}; + false -> State end. ensure_stats_timer(State) -> @@ -529,15 +524,10 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, {false, BQS1} -> DeliverFun = fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - %% we don't need an expiry here because - %% messages are not being enqueued, so we use - %% an empty message_properties. - {AckTag, BQS3} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - SenderPid, BQS2), + Props = message_properties(Confirm, State1), + {AckTag, BQS3} = BQ:publish_delivered( + AckRequired, Message, Props, + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -564,8 +554,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, maybe_record_confirm_message(Confirm, State1), case Delivered of true -> State2; - false -> Props = (message_properties(State)) #message_properties{ - needs_confirming = needs_confirming(Confirm)}, + false -> Props = message_properties(Confirm, State), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -693,8 +682,9 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(#q{ttl=TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL)}. +message_properties(Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL), + needs_confirming = needs_confirming(Confirm)}. calculate_msg_expiry(undefined) -> undefined; calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). @@ -732,10 +722,9 @@ dead_letter_fun(Reason, _State) -> dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> case rabbit_exchange:lookup(DLX) of - {error, not_found} -> - noreply(State); - _ -> - dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State) + {error, not_found} -> noreply(State); + _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, + State) end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, @@ -752,7 +741,7 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_, BQS1} = BQ:ack([AckTag], BQS), + [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); _ -> State3 = lists:foldl( @@ -789,11 +778,12 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, error -> noreply(State); {ok, _} -> - #resource{name = QName} = qname(State), - rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), + rabbit_log:info("DLQ ~p (for ~s) died~n", + [QPid, rabbit_misc:rs(qname(State))]), + State1 = State#q{queue_monitors = dict:erase(QPid, QMons)}, case gb_trees:lookup(QPid, UQM) of none -> - noreply(State); + noreply(State1); {value, MsgSeqNosSet} -> case rabbit_misc:is_abnormal_termination(Reason) of true -> rabbit_log:warning( @@ -801,9 +791,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, [gb_sets:size(MsgSeqNosSet)]); false -> ok end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, - State#q{queue_monitors = - dict:erase(QPid, QMons)}) + handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1) end end. @@ -1228,7 +1216,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), noreply(subtract_acks( ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)). + fun (State1) -> requeue_and_run(AckTags, State1) end)); + +handle_call(force_event_refresh, _From, + State = #q{exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), + case Exclusive of + none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || + {Ch, CTag, AckRequired} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), + emit_consumer_created(Ch, CTag, true, AckRequired) + end, + reply(ok, State). handle_cast({confirm, MsgSeqNos, QPid}, State) -> handle_confirm(MsgSeqNos, QPid, State); @@ -1271,14 +1270,14 @@ handle_cast({ack, AckTags, ChPid}, State) -> handle_cast({reject, AckTags, Requeue, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case Requeue of - true -> requeue_and_run(AckTags, State1); - false -> Fun = dead_letter_fun(rejected, State), + case Requeue of + true -> fun (State1) -> requeue_and_run(AckTags, State1) end; + false -> Fun = dead_letter_fun(rejected, State), + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> BQS1 = BQ:fold(Fun, BQS, AckTags), State1#q{backing_queue_state = BQS1} - end + end end)); handle_cast(delete_immediately, State) -> @@ -1325,16 +1324,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), - case Exclusive of - none -> [emit_consumer_created(Ch, CTag, false, AckRequired) || - {Ch, CTag, AckRequired} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired) - end, - noreply(State); - handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> dead_letter_msg(Msg, AckTag, Reason, State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index bb6636eb..cac622f8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1411,11 +1411,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> - MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ], - coalesce_and_send(MsgSeqNos, + coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs], fun(MsgSeqNo, Multiple) -> #'basic.nack'{delivery_tag = MsgSeqNo, - multiple = Multiple} + multiple = Multiple} end, State); send_nacks(_, State) -> maybe_complete_tx(State#ch{tx_status = failed}). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index f4c425ca..ba0cb04f 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -export([start_link/0]). +-export([filename/0]). -export([gen/0, gen_secure/0, string/2, binary/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -38,6 +39,7 @@ -type(guid() :: binary()). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). +-spec(filename/0 :: () -> string()). -spec(gen/0 :: () -> guid()). -spec(gen_secure/0 :: () -> guid()). -spec(string/2 :: (guid(), any()) -> string()). @@ -51,8 +53,14 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [update_disk_serial()], []). +%% We use this to detect a (possibly rather old) Mnesia directory, +%% since it has existed since at least 1.7.0 (as far back as I cared +%% to go). +filename() -> + filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME). + update_disk_serial() -> - Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Filename = filename(), Serial = case rabbit_file:read_term_file(Filename) of {ok, [Num]} -> Num; {error, enoent} -> 0; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index db7d8ecc..180677fe 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -134,18 +134,17 @@ add_mirror(Queue, MirrorNode) -> Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> Result = rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]), - case Result of + [] -> case rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]) of {ok, undefined} -> %% Already running ok; - {ok, _Pid} -> + {ok, SPid} -> rabbit_log:info( "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, Result]), + [rabbit_misc:rs(Name), MirrorNode, SPid]), ok; - _ -> - Result + Other -> + Other end; [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} end diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dca3bead..ddf7f574 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -58,6 +58,7 @@ -export([pget/2, pget/3, pget_or_die/2]). -export([format_message_queue/2]). -export([append_rpc_all_nodes/4]). +-export([multi_call/2]). -export([quit/1]). %%---------------------------------------------------------------------------- @@ -200,6 +201,8 @@ -spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()). -spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()). -spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]). +-spec(multi_call/2 :: + ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). -endif. @@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) -> _ -> Res end || Res <- ResL]). +%% A simplified version of gen_server:multi_call/2 with a sane +%% API. This is not in gen_server2 as there is no useful +%% infrastructure there to share. +multi_call(Pids, Req) -> + MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids], + receive_multi_call(MonitorPids, [], []). + +start_multi_call(Pid, Req) when is_pid(Pid) -> + Mref = erlang:monitor(process, Pid), + Pid ! {'$gen_call', {self(), Mref}, Req}, + {Mref, Pid}. + +receive_multi_call([], Good, Bad) -> + {lists:reverse(Good), lists:reverse(Bad)}; +receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) -> + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad); + {'DOWN', Mref, _, _, noconnection} -> + receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]); + {'DOWN', Mref, _, _, Reason} -> + receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad]) + end. + %% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 4d419fd9..c714d3a7 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -508,7 +508,7 @@ init_db(ClusterNodes, Force) -> ok -> ok; %% If we're just starting up a new node we won't have a %% version - version_not_available -> ok = rabbit_version:record_desired() + starting_from_scratch -> ok = rabbit_version:record_desired() end end). diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 02889b93..e6a05335 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -19,7 +19,8 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, - sockname/1, peername/1, peercert/1, connection_string/2]). + maybe_fast_close/1, sockname/1, peername/1, peercert/1, + connection_string/2]). %%--------------------------------------------------------------------------- @@ -53,6 +54,7 @@ binary()}]) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). +-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). @@ -135,6 +137,9 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). +maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok; +maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok. + sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 329c07dc..9a972d9e 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -39,15 +39,15 @@ names(Hostname) -> Self = self(), - process_flag(trap_exit, true), - Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end), + Ref = make_ref(), + {Pid, MRef} = spawn_monitor( + fun () -> Self ! {Ref, net_adm:names(Hostname)} end), timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - Res = receive - {names, Names} -> Names; - {'EXIT', Pid, Reason} -> {error, Reason} - end, - process_flag(trap_exit, false), - Res. + receive + {Ref, Names} -> erlang:demonitor(MRef, [flush]), + Names; + {'DOWN', MRef, process, Pid, Reason} -> {error, Reason} + end. diagnostics(Nodes) -> Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3d07e8b0..3ef769c7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -21,6 +21,8 @@ publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). +-export([scan/3]). + -export([add_queue_ttl/0]). -define(CLEAN_FILENAME, "clean.dot"). @@ -219,6 +221,12 @@ {non_neg_integer(), non_neg_integer(), qistate()}). -spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}). +-spec(scan/3 :: (file:filename(), + fun ((seq_id(), rabbit_types:msg_id(), + rabbit_types:message_properties(), boolean(), + ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A), + A) -> A). + -spec(add_queue_ttl/0 :: () -> 'ok'). -endif. @@ -378,7 +386,10 @@ all_queue_directory_names(Dir) -> %%---------------------------------------------------------------------------- blank_state(QueueName) -> - Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)), + blank_state_dir( + filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). + +blank_state_dir(Dir) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), - [ok = segment_entries_foldr( - fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack}, - ok) -> - gatherer:in(Gatherer, {MsgId, 1}); - (_RelSeq, _Value, Acc) -> - Acc - end, ok, segment_find_or_new(Seg, Dir, Segments)) || - Seg <- all_segment_nums(State)], - {_SegmentCounts, _State} = terminate(State), + State = blank_state(QueueName), + ok = scan_segments( + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> + gatherer:in(Gatherer, {MsgId, 1}); + (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, + _IsAcked, Acc) -> + Acc + end, ok, State), ok = gatherer:finish(Gatherer). +scan(Dir, Fun, Acc) -> + scan_segments(Fun, Acc, blank_state_dir(Dir)). + +scan_segments(Fun, Acc, State) -> + State1 = #qistate { segments = Segments, dir = Dir } = + recover_journal(State), + Result = lists:foldr( + fun (Seg, AccN) -> + segment_entries_foldr( + fun (RelSeq, {{MsgId, MsgProps, IsPersistent}, + IsDelivered, IsAcked}, AccM) -> + Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps, + IsPersistent, IsDelivered, IsAcked, AccM) + end, AccN, segment_find_or_new(Seg, Dir, Segments)) + end, Acc, all_segment_nums(State1)), + {_SegmentCounts, _State} = terminate(State1), + Result. + %%---------------------------------------------------------------------------- %% expiry/binary manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index add13043..47e796dc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -233,12 +233,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, "closing AMQP connection ~p (~s):~n~p~n", [self(), ConnStr, Ex]) after - %% We don't close the socket explicitly. The reader is the - %% controlling process and hence its termination will close - %% the socket. Furthermore, gen_tcp:close/1 waits for pending - %% output to be sent, which results in unnecessary delays. + %% The reader is the controlling process and hence its + %% termination will close the socket. Furthermore, + %% gen_tcp:close/1 waits for pending output to be sent, which + %% results in unnecessary delays. However, to keep the + %% file_handle_cache accounting as accurate as possible it + %% would be good to close the socket immediately if we + %% can. But we can only do this for non-ssl sockets. %% - %% gen_tcp:close(ClientSock), + rabbit_net:maybe_fast_close(ClientSock), rabbit_event:notify(connection_closed, [{pid, self()}]) end, done. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 629224b2..55e4a6f8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,6 +35,7 @@ all_tests() -> passed = mirrored_supervisor_tests:all_tests(), application:set_env(rabbit, file_handles_high_watermark, 10, infinity), ok = file_handle_cache:set_limit(10), + passed = test_multi_call(), passed = test_file_handle_cache(), passed = test_backing_queue(), passed = test_priority_queue(), @@ -107,6 +108,26 @@ run_cluster_dependent_tests(SecondaryNode) -> passed. +test_multi_call() -> + Fun = fun() -> + receive + {'$gen_call', {From, Mref}, request} -> + From ! {Mref, response} + end, + receive + never -> ok + end + end, + Pid1 = spawn(Fun), + Pid2 = spawn(Fun), + Pid3 = spawn(Fun), + exit(Pid2, bang), + {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} = + rabbit_misc:multi_call([Pid1, Pid2, Pid3], request), + exit(Pid1, bang), + exit(Pid3, bang), + passed. + test_priority_queue() -> false = priority_queue:is_queue(not_a_queue), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 80f50b38..e1a7bcae 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -28,7 +28,9 @@ -ifdef(use_specs). -spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). --spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | + 'version_not_available' | + 'starting_from_scratch'). -endif. @@ -119,8 +121,13 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - AllNodes = rabbit_mnesia:all_clustered_nodes(), + %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point + %% if we are a RAM node since Mnesia has not started yet. + AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ + rabbit_mnesia:read_cluster_nodes_config()), case rabbit_version:upgrades_required(mnesia) of + {error, starting_from_scratch} -> + ok; {error, version_not_available} -> case AllNodes of [_] -> ok; @@ -235,6 +242,7 @@ nodes_running(Nodes) -> maybe_upgrade_local() -> case rabbit_version:upgrades_required(local) of {error, version_not_available} -> version_not_available; + {error, starting_from_scratch} -> starting_from_scratch; {error, _} = Err -> throw(Err); {ok, []} -> ensure_backup_removed(), ok; diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 7545d813..1cc7d6c8 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -96,7 +96,10 @@ record_desired_for_scope(Scope) -> upgrades_required(Scope) -> case recorded_for_scope(Scope) of {error, enoent} -> - {error, version_not_available}; + case filelib:is_file(rabbit_guid:filename()) of + false -> {error, starting_from_scratch}; + true -> {error, version_not_available} + end; {ok, CurrentHeads} -> with_upgrade_graph( fun (G) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index a2f4fae9..8dd8aba8 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -9,15 +9,15 @@ %% terminated as per the shutdown component of the child_spec. %% %% 3) child specifications can contain, as the restart type, a tuple -%% {permanent, Delay} | {transient, Delay} where Delay >= 0. The -%% delay, in seconds, indicates what should happen if a child, upon -%% being restarted, exceeds the MaxT and MaxR parameters. Thus, if -%% a child exits, it is restarted as normal. If it exits -%% sufficiently quickly and often to exceed the boundaries set by -%% the MaxT and MaxR parameters, and a Delay is specified, then -%% rather than stopping the supervisor, the supervisor instead -%% continues and tries to start up the child again, Delay seconds -%% later. +%% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay} +%% where Delay >= 0 (see point (4) below for intrinsic). The delay, +%% in seconds, indicates what should happen if a child, upon being +%% restarted, exceeds the MaxT and MaxR parameters. Thus, if a +%% child exits, it is restarted as normal. If it exits sufficiently +%% quickly and often to exceed the boundaries set by the MaxT and +%% MaxR parameters, and a Delay is specified, then rather than +%% stopping the supervisor, the supervisor instead continues and +%% tries to start up the child again, Delay seconds later. %% %% Note that you can never restart more frequently than the MaxT %% and MaxR parameters allow: i.e. you must wait until *both* the @@ -31,6 +31,16 @@ %% the MaxT and MaxR parameters to permit the child to be %% restarted. This may require waiting for longer than Delay. %% +%% Sometimes, you may wish for a transient or intrinsic child to +%% exit abnormally so that it gets restarted, but still log +%% nothing. gen_server will log any exit reason other than +%% 'normal', 'shutdown' or {'shutdown', _}. Thus the exit reason of +%% {'shutdown', 'restart'} is interpreted to mean you wish the +%% child to be restarted according to the delay parameters, but +%% gen_server will not log the error. Thus from gen_server's +%% perspective it's a normal exit, whilst from supervisor's +%% perspective, it's an abnormal exit. +%% %% 4) Added an 'intrinsic' restart type. Like the transient type, this %% type means the child should only be restarted if the child exits %% abnormally. Unlike the transient type, if the child exits @@ -529,25 +539,23 @@ restart_child(Pid, Reason, State) -> {ok, State} end. -do_restart({RestartType, Delay}, Reason, Child, State) -> - case restart1(Child, State) of - {ok, NState} -> - {ok, NState}; - {terminate, NState} -> - _TRef = erlang:send_after(trunc(Delay*1000), self(), - {delayed_restart, - {{RestartType, Delay}, Reason, Child}}), - {ok, state_del_child(Child, NState)} - end; +do_restart({permanent = RestartType, Delay}, Reason, Child, State) -> + do_restart_delay({RestartType, Delay}, Reason, Child, State); do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); do_restart(Type, normal, Child, State) -> del_child_and_maybe_shutdown(Type, Child, State); +do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State) + when RestartType =:= transient orelse RestartType =:= intrinsic -> + do_restart_delay({RestartType, Delay}, Reason, Child, State); do_restart(Type, {shutdown, _}, Child, State) -> del_child_and_maybe_shutdown(Type, Child, State); do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) -> del_child_and_maybe_shutdown(Type, Child, State); +do_restart({RestartType, Delay}, Reason, Child, State) + when RestartType =:= transient orelse RestartType =:= intrinsic -> + do_restart_delay({RestartType, Delay}, Reason, Child, State); do_restart(Type, Reason, Child, State) when Type =:= transient orelse Type =:= intrinsic -> report_error(child_terminated, Reason, Child, State#state.name), @@ -557,8 +565,21 @@ do_restart(temporary, Reason, Child, State) -> NState = state_del_child(Child, State), {ok, NState}. +do_restart_delay({RestartType, Delay}, Reason, Child, State) -> + case restart1(Child, State) of + {ok, NState} -> + {ok, NState}; + {terminate, NState} -> + _TRef = erlang:send_after(trunc(Delay*1000), self(), + {delayed_restart, + {{RestartType, Delay}, Reason, Child}}), + {ok, state_del_child(Child, NState)} + end. + del_child_and_maybe_shutdown(intrinsic, Child, State) -> {shutdown, state_del_child(Child, State)}; +del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) -> + {shutdown, state_del_child(Child, State)}; del_child_and_maybe_shutdown(_, Child, State) -> {ok, state_del_child(Child, State)}. @@ -911,7 +932,8 @@ supname(N,_) -> N. %%% Func is {Mod, Fun, Args} == {atom, atom, list} %%% RestartType is permanent | temporary | transient | %%% intrinsic | {permanent, Delay} | -%%% {transient, Delay} where Delay >= 0 +%%% {transient, Delay} | {intrinsic, Delay} +%% where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic @@ -962,6 +984,7 @@ validRestartType(temporary) -> true; validRestartType(transient) -> true; validRestartType(intrinsic) -> true; validRestartType({permanent, Delay}) -> validDelay(Delay); +validRestartType({intrinsic, Delay}) -> validDelay(Delay); validRestartType({transient, Delay}) -> validDelay(Delay); validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}). diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index 43a6bc99..344196d7 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -69,13 +69,7 @@ handle_info({inet_async, LSock, Ref, {error, closed}}, handle_info({inet_async, LSock, Ref, {error, Reason}}, State=#state{sock=LSock, ref=Ref}) -> - {AddressS, Port} = case inet:sockname(LSock) of - {ok, {A, P}} -> {rabbit_misc:ntoab(A), P}; - {error, _} -> {"unknown", unknown} - end, - error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n", - [AddressS, Port, Reason]), - accept(State); + {stop, {accept_failed, Reason}, State}; handle_info(_Info, State) -> {noreply, State}. |