summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packaging/RPMS/Fedora/Makefile3
-rw-r--r--packaging/common/rabbitmq-server.init5
-rw-r--r--packaging/debs/Debian/Makefile1
-rw-r--r--packaging/windows/Makefile5
-rw-r--r--src/credit_flow.erl3
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl89
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_guid.erl10
-rw-r--r--src/rabbit_mirror_queue_misc.erl13
-rw-r--r--src/rabbit_misc.erl28
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_net.erl7
-rw-r--r--src/rabbit_nodes.erl16
-rw-r--r--src/rabbit_queue_index.erl50
-rw-r--r--src/rabbit_reader.erl13
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_upgrade.erl12
-rw-r--r--src/rabbit_version.erl5
-rw-r--r--src/supervisor2.erl63
-rw-r--r--src/tcp_acceptor.erl8
21 files changed, 255 insertions, 127 deletions
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index c67d8fd6..234fc2c7 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -15,9 +15,11 @@ endif
ifeq "$(RPM_OS)" "suse"
REQUIRES=/sbin/chkconfig /sbin/service
OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
+START_PROG=setsid
else
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
+START_PROG=runuser rabbitmq --session-command
endif
rpms: clean server
@@ -32,6 +34,7 @@ prepare:
cp ${COMMON_DIR}/* SOURCES/
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
+ -e 's|^START_PROG=.*$$|START_PROG="$(START_PROG)"|' \
SOURCES/rabbitmq-server.init
ifeq "$(RPM_OS)" "fedora"
# Fedora says that only vital services should have Default-Start
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init
index 4084d8c7..c942f8e3 100644
--- a/packaging/common/rabbitmq-server.init
+++ b/packaging/common/rabbitmq-server.init
@@ -26,7 +26,8 @@ ROTATE_SUFFIX=
INIT_LOG_DIR=/var/log/rabbitmq
PID_FILE=/var/run/rabbitmq/pid
-LOCK_FILE= # This is filled in when building packages
+START_PROG= # Set when building package
+LOCK_FILE= # Set when building package
test -x $DAEMON || exit 0
test -x $CONTROL || exit 0
@@ -56,7 +57,7 @@ start_rabbitmq () {
RETVAL=0
ensure_pid_dir
set +e
- RABBITMQ_PID_FILE=$PID_FILE setsid $DAEMON \
+ RABBITMQ_PID_FILE=$PID_FILE $START_PROG $DAEMON \
> "${INIT_LOG_DIR}/startup_log" \
2> "${INIT_LOG_DIR}/startup_err" \
0<&- &
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 79e9c1dd..2a738f6e 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -26,6 +26,7 @@ package: clean
# runlevel 2 should start network services.
sed -i \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
+ -e 's|^START_PROG=.*$$|START_PROG="start-stop-daemon -v --chuid rabbitmq --start --exec"|' \
-e 's|^\(# Default-Start:\).*$$|\1 2 3 4 5|' \
-e 's|^\(# Default-Stop:\).*$$|\1 0 1 6|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index a910941b..1c222162 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -8,10 +8,7 @@ dist:
$(MAKE) -C $(SOURCE_DIR)
mkdir $(SOURCE_DIR)/sbin
- mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin
- mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin
- mv $(SOURCE_DIR)/scripts/rabbitmq-plugins.bat $(SOURCE_DIR)/sbin
- mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
+ mv $(SOURCE_DIR)/scripts/*.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile $(SOURCE_DIR)/*mk
rm -f $(SOURCE_DIR)/README
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 072f4d9d..ba99811f 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -96,7 +96,8 @@ peer_down(Peer) ->
%% credit_deferred and thus send messages into the void...
unblock(Peer),
erase({credit_from, Peer}),
- erase({credit_to, Peer}).
+ erase({credit_to, Peer}),
+ ok.
%% --------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 48236ca5..c2724a12 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -42,6 +42,8 @@
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+-define(FAILOVER_WAIT_MILLIS, 100).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -421,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+%% We need to account for the idea that queues may be mid-promotion
+%% during force_event_refresh (since it's likely we're doing this in
+%% the first place since a node failed). Therefore we keep poking at
+%% the list of queues until we were able to talk to a live process or
+%% the queue no longer exists.
force_event_refresh() ->
- [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
- ok.
+ force_event_refresh([Q#amqqueue.name || Q <- list()]).
+
+force_event_refresh(QNames) ->
+ Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ {_, Bad} = rabbit_misc:multi_call(
+ [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ FailedPids = [Pid || {Pid, _Reason} <- Bad],
+ Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
+ lists:member(Pid, FailedPids)],
+ case Failed of
+ [] -> ok;
+ _ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
+ force_event_refresh(Failed)
+ end.
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ee547138..e1fd9bbc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -24,9 +24,6 @@
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MESSAGE_PROPERTIES,
- #message_properties{expiry = undefined, needs_confirming = false}).
-
-export([start_link/1, info_keys/0]).
-export([init_with_backing_queue_state/7]).
@@ -330,12 +327,10 @@ ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires}) ->
case is_unused(State) of
- true ->
- NewState = stop_expiry_timer(State),
- TRef = erlang:send_after(Expires, self(), maybe_expire),
- NewState#q{expiry_timer_ref = TRef};
- false ->
- State
+ true -> NewState = stop_expiry_timer(State),
+ TRef = erlang:send_after(Expires, self(), maybe_expire),
+ NewState#q{expiry_timer_ref = TRef};
+ false -> State
end.
ensure_stats_timer(State) ->
@@ -529,15 +524,10 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
{false, BQS1} ->
DeliverFun =
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- %% we don't need an expiry here because
- %% messages are not being enqueued, so we use
- %% an empty message_properties.
- {AckTag, BQS3} =
- BQ:publish_delivered(
- AckRequired, Message,
- (?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = needs_confirming(Confirm)},
- SenderPid, BQS2),
+ Props = message_properties(Confirm, State1),
+ {AckTag, BQS3} = BQ:publish_delivered(
+ AckRequired, Message, Props,
+ SenderPid, BQS2),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS3}}
end,
@@ -564,8 +554,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
case Delivered of
true -> State2;
- false -> Props = (message_properties(State)) #message_properties{
- needs_confirming = needs_confirming(Confirm)},
+ false -> Props = message_properties(Confirm, State),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
@@ -693,8 +682,9 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(#q{ttl=TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL)}.
+message_properties(Confirm, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(TTL),
+ needs_confirming = needs_confirming(Confirm)}.
calculate_msg_expiry(undefined) -> undefined;
calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
@@ -732,10 +722,9 @@ dead_letter_fun(Reason, _State) ->
dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
case rabbit_exchange:lookup(DLX) of
- {error, not_found} ->
- noreply(State);
- _ ->
- dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State)
+ {error, not_found} -> noreply(State);
+ _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
+ State)
end.
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
@@ -752,7 +741,7 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
State2 = State1#q{publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> {_, BQS1} = BQ:ack([AckTag], BQS),
+ [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
_ -> State3 =
lists:foldl(
@@ -789,11 +778,12 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
error ->
noreply(State);
{ok, _} ->
- #resource{name = QName} = qname(State),
- rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]),
+ rabbit_log:info("DLQ ~p (for ~s) died~n",
+ [QPid, rabbit_misc:rs(qname(State))]),
+ State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
case gb_trees:lookup(QPid, UQM) of
none ->
- noreply(State);
+ noreply(State1);
{value, MsgSeqNosSet} ->
case rabbit_misc:is_abnormal_termination(Reason) of
true -> rabbit_log:warning(
@@ -801,9 +791,7 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
[gb_sets:size(MsgSeqNosSet)]);
false -> ok
end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
- State#q{queue_monitors =
- dict:erase(QPid, QMons)})
+ handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
end
end.
@@ -1228,7 +1216,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
noreply(subtract_acks(
ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end)).
+ fun (State1) -> requeue_and_run(AckTags, State1) end));
+
+handle_call(force_event_refresh, _From,
+ State = #q{exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
+ case Exclusive of
+ none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
+ {Ch, CTag, AckRequired} <- consumers(State)];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
+ emit_consumer_created(Ch, CTag, true, AckRequired)
+ end,
+ reply(ok, State).
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
handle_confirm(MsgSeqNos, QPid, State);
@@ -1271,14 +1270,14 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case Requeue of
- true -> requeue_and_run(AckTags, State1);
- false -> Fun = dead_letter_fun(rejected, State),
+ case Requeue of
+ true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
+ false -> Fun = dead_letter_fun(rejected, State),
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
BQS1 = BQ:fold(Fun, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
- end
+ end
end));
handle_cast(delete_immediately, State) ->
@@ -1325,16 +1324,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
- case Exclusive of
- none -> [emit_consumer_created(Ch, CTag, false, AckRequired) ||
- {Ch, CTag, AckRequired} <- consumers(State)];
- {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_created(Ch, CTag, true, AckRequired)
- end,
- noreply(State);
-
handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
dead_letter_msg(Msg, AckTag, Reason, State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bb6636eb..cac622f8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1411,11 +1411,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
- MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
- coalesce_and_send(MsgSeqNos,
+ coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
+ multiple = Multiple}
end, State);
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index f4c425ca..ba0cb04f 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -19,6 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
+-export([filename/0]).
-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -38,6 +39,7 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(filename/0 :: () -> string()).
-spec(gen/0 :: () -> guid()).
-spec(gen_secure/0 :: () -> guid()).
-spec(string/2 :: (guid(), any()) -> string()).
@@ -51,8 +53,14 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE,
[update_disk_serial()], []).
+%% We use this to detect a (possibly rather old) Mnesia directory,
+%% since it has existed since at least 1.7.0 (as far back as I cared
+%% to go).
+filename() ->
+ filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME).
+
update_disk_serial() ->
- Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
+ Filename = filename(),
Serial = case rabbit_file:read_term_file(Filename) of
{ok, [Num]} -> Num;
{error, enoent} -> 0;
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index db7d8ecc..180677fe 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -134,18 +134,17 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> Result = rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]),
- case Result of
+ [] -> case rabbit_mirror_queue_slave_sup:start_child(
+ MirrorNode, [Q]) of
{ok, undefined} -> %% Already running
ok;
- {ok, _Pid} ->
+ {ok, SPid} ->
rabbit_log:info(
"Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, Result]),
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
ok;
- _ ->
- Result
+ Other ->
+ Other
end;
[_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
end
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dca3bead..ddf7f574 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -58,6 +58,7 @@
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
+-export([multi_call/2]).
-export([quit/1]).
%%----------------------------------------------------------------------------
@@ -200,6 +201,8 @@
-spec(pget_or_die/2 :: (term(), [term()]) -> term() | no_return()).
-spec(format_message_queue/2 :: (any(), priority_queue:q()) -> term()).
-spec(append_rpc_all_nodes/4 :: ([node()], atom(), atom(), [any()]) -> [any()]).
+-spec(multi_call/2 ::
+ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-endif.
@@ -880,6 +883,31 @@ append_rpc_all_nodes(Nodes, M, F, A) ->
_ -> Res
end || Res <- ResL]).
+%% A simplified version of gen_server:multi_call/2 with a sane
+%% API. This is not in gen_server2 as there is no useful
+%% infrastructure there to share.
+multi_call(Pids, Req) ->
+ MonitorPids = [start_multi_call(Pid, Req) || Pid <- Pids],
+ receive_multi_call(MonitorPids, [], []).
+
+start_multi_call(Pid, Req) when is_pid(Pid) ->
+ Mref = erlang:monitor(process, Pid),
+ Pid ! {'$gen_call', {self(), Mref}, Req},
+ {Mref, Pid}.
+
+receive_multi_call([], Good, Bad) ->
+ {lists:reverse(Good), lists:reverse(Bad)};
+receive_multi_call([{Mref, Pid} | MonitorPids], Good, Bad) ->
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ receive_multi_call(MonitorPids, [{Pid, Reply} | Good], Bad);
+ {'DOWN', Mref, _, _, noconnection} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, nodedown} | Bad]);
+ {'DOWN', Mref, _, _, Reason} ->
+ receive_multi_call(MonitorPids, Good, [{Pid, Reason} | Bad])
+ end.
+
%% the slower shutdown on windows required to flush stdout
quit(Status) ->
case os:type() of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 4d419fd9..c714d3a7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -508,7 +508,7 @@ init_db(ClusterNodes, Force) ->
ok -> ok;
%% If we're just starting up a new node we won't have a
%% version
- version_not_available -> ok = rabbit_version:record_desired()
+ starting_from_scratch -> ok = rabbit_version:record_desired()
end
end).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 02889b93..e6a05335 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -19,7 +19,8 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1, connection_string/2]).
+ maybe_fast_close/1, sockname/1, peername/1, peercert/1,
+ connection_string/2]).
%%---------------------------------------------------------------------------
@@ -53,6 +54,7 @@
binary()}]) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -135,6 +137,9 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
+maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
+maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 329c07dc..9a972d9e 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -39,15 +39,15 @@
names(Hostname) ->
Self = self(),
- process_flag(trap_exit, true),
- Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ Ref = make_ref(),
+ {Pid, MRef} = spawn_monitor(
+ fun () -> Self ! {Ref, net_adm:names(Hostname)} end),
timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- Res = receive
- {names, Names} -> Names;
- {'EXIT', Pid, Reason} -> {error, Reason}
- end,
- process_flag(trap_exit, false),
- Res.
+ receive
+ {Ref, Names} -> erlang:demonitor(MRef, [flush]),
+ Names;
+ {'DOWN', MRef, process, Pid, Reason} -> {error, Reason}
+ end.
diagnostics(Nodes) ->
Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3d07e8b0..3ef769c7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -21,6 +21,8 @@
publish/5, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, recover/1]).
+-export([scan/3]).
+
-export([add_queue_ttl/0]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -219,6 +221,12 @@
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> {[[any()]], {walker(A), A}}).
+-spec(scan/3 :: (file:filename(),
+ fun ((seq_id(), rabbit_types:msg_id(),
+ rabbit_types:message_properties(), boolean(),
+ ('del' | 'no_del'), ('ack' | 'no_ack'), A) -> A),
+ A) -> A).
+
-spec(add_queue_ttl/0 :: () -> 'ok').
-endif.
@@ -378,7 +386,10 @@ all_queue_directory_names(Dir) ->
%%----------------------------------------------------------------------------
blank_state(QueueName) ->
- Dir = filename:join(queues_dir(), queue_name_to_dir_name(QueueName)),
+ blank_state_dir(
+ filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
+
+blank_state_dir(Dir) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -523,19 +534,34 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
- [ok = segment_entries_foldr(
- fun (_RelSeq, {{MsgId, _MsgProps, true}, _IsDelivered, no_ack},
- ok) ->
- gatherer:in(Gatherer, {MsgId, 1});
- (_RelSeq, _Value, Acc) ->
- Acc
- end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
- Seg <- all_segment_nums(State)],
- {_SegmentCounts, _State} = terminate(State),
+ State = blank_state(QueueName),
+ ok = scan_segments(
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) ->
+ gatherer:in(Gatherer, {MsgId, 1});
+ (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
+ _IsAcked, Acc) ->
+ Acc
+ end, ok, State),
ok = gatherer:finish(Gatherer).
+scan(Dir, Fun, Acc) ->
+ scan_segments(Fun, Acc, blank_state_dir(Dir)).
+
+scan_segments(Fun, Acc, State) ->
+ State1 = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(State),
+ Result = lists:foldr(
+ fun (Seg, AccN) ->
+ segment_entries_foldr(
+ fun (RelSeq, {{MsgId, MsgProps, IsPersistent},
+ IsDelivered, IsAcked}, AccM) ->
+ Fun(reconstruct_seq_id(Seg, RelSeq), MsgId, MsgProps,
+ IsPersistent, IsDelivered, IsAcked, AccM)
+ end, AccN, segment_find_or_new(Seg, Dir, Segments))
+ end, Acc, all_segment_nums(State1)),
+ {_SegmentCounts, _State} = terminate(State1),
+ Result.
+
%%----------------------------------------------------------------------------
%% expiry/binary manipulation
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index add13043..47e796dc 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -233,12 +233,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end, "closing AMQP connection ~p (~s):~n~p~n",
[self(), ConnStr, Ex])
after
- %% We don't close the socket explicitly. The reader is the
- %% controlling process and hence its termination will close
- %% the socket. Furthermore, gen_tcp:close/1 waits for pending
- %% output to be sent, which results in unnecessary delays.
+ %% The reader is the controlling process and hence its
+ %% termination will close the socket. Furthermore,
+ %% gen_tcp:close/1 waits for pending output to be sent, which
+ %% results in unnecessary delays. However, to keep the
+ %% file_handle_cache accounting as accurate as possible it
+ %% would be good to close the socket immediately if we
+ %% can. But we can only do this for non-ssl sockets.
%%
- %% gen_tcp:close(ClientSock),
+ rabbit_net:maybe_fast_close(ClientSock),
rabbit_event:notify(connection_closed, [{pid, self()}])
end,
done.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 629224b2..55e4a6f8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,6 +35,7 @@ all_tests() ->
passed = mirrored_supervisor_tests:all_tests(),
application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
ok = file_handle_cache:set_limit(10),
+ passed = test_multi_call(),
passed = test_file_handle_cache(),
passed = test_backing_queue(),
passed = test_priority_queue(),
@@ -107,6 +108,26 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed.
+test_multi_call() ->
+ Fun = fun() ->
+ receive
+ {'$gen_call', {From, Mref}, request} ->
+ From ! {Mref, response}
+ end,
+ receive
+ never -> ok
+ end
+ end,
+ Pid1 = spawn(Fun),
+ Pid2 = spawn(Fun),
+ Pid3 = spawn(Fun),
+ exit(Pid2, bang),
+ {[{Pid1, response}, {Pid3, response}], [{Pid2, _Fail}]} =
+ rabbit_misc:multi_call([Pid1, Pid2, Pid3], request),
+ exit(Pid1, bang),
+ exit(Pid3, bang),
+ passed.
+
test_priority_queue() ->
false = priority_queue:is_queue(not_a_queue),
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 80f50b38..e1a7bcae 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -28,7 +28,9 @@
-ifdef(use_specs).
-spec(maybe_upgrade_mnesia/0 :: () -> 'ok').
--spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available').
+-spec(maybe_upgrade_local/0 :: () -> 'ok' |
+ 'version_not_available' |
+ 'starting_from_scratch').
-endif.
@@ -119,8 +121,13 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- AllNodes = rabbit_mnesia:all_clustered_nodes(),
+ %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
+ %% if we are a RAM node since Mnesia has not started yet.
+ AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
+ rabbit_mnesia:read_cluster_nodes_config()),
case rabbit_version:upgrades_required(mnesia) of
+ {error, starting_from_scratch} ->
+ ok;
{error, version_not_available} ->
case AllNodes of
[_] -> ok;
@@ -235,6 +242,7 @@ nodes_running(Nodes) ->
maybe_upgrade_local() ->
case rabbit_version:upgrades_required(local) of
{error, version_not_available} -> version_not_available;
+ {error, starting_from_scratch} -> starting_from_scratch;
{error, _} = Err -> throw(Err);
{ok, []} -> ensure_backup_removed(),
ok;
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 7545d813..1cc7d6c8 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -96,7 +96,10 @@ record_desired_for_scope(Scope) ->
upgrades_required(Scope) ->
case recorded_for_scope(Scope) of
{error, enoent} ->
- {error, version_not_available};
+ case filelib:is_file(rabbit_guid:filename()) of
+ false -> {error, starting_from_scratch};
+ true -> {error, version_not_available}
+ end;
{ok, CurrentHeads} ->
with_upgrade_graph(
fun (G) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index a2f4fae9..8dd8aba8 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -9,15 +9,15 @@
%% terminated as per the shutdown component of the child_spec.
%%
%% 3) child specifications can contain, as the restart type, a tuple
-%% {permanent, Delay} | {transient, Delay} where Delay >= 0. The
-%% delay, in seconds, indicates what should happen if a child, upon
-%% being restarted, exceeds the MaxT and MaxR parameters. Thus, if
-%% a child exits, it is restarted as normal. If it exits
-%% sufficiently quickly and often to exceed the boundaries set by
-%% the MaxT and MaxR parameters, and a Delay is specified, then
-%% rather than stopping the supervisor, the supervisor instead
-%% continues and tries to start up the child again, Delay seconds
-%% later.
+%% {permanent, Delay} | {transient, Delay} | {intrinsic, Delay}
+%% where Delay >= 0 (see point (4) below for intrinsic). The delay,
+%% in seconds, indicates what should happen if a child, upon being
+%% restarted, exceeds the MaxT and MaxR parameters. Thus, if a
+%% child exits, it is restarted as normal. If it exits sufficiently
+%% quickly and often to exceed the boundaries set by the MaxT and
+%% MaxR parameters, and a Delay is specified, then rather than
+%% stopping the supervisor, the supervisor instead continues and
+%% tries to start up the child again, Delay seconds later.
%%
%% Note that you can never restart more frequently than the MaxT
%% and MaxR parameters allow: i.e. you must wait until *both* the
@@ -31,6 +31,16 @@
%% the MaxT and MaxR parameters to permit the child to be
%% restarted. This may require waiting for longer than Delay.
%%
+%% Sometimes, you may wish for a transient or intrinsic child to
+%% exit abnormally so that it gets restarted, but still log
+%% nothing. gen_server will log any exit reason other than
+%% 'normal', 'shutdown' or {'shutdown', _}. Thus the exit reason of
+%% {'shutdown', 'restart'} is interpreted to mean you wish the
+%% child to be restarted according to the delay parameters, but
+%% gen_server will not log the error. Thus from gen_server's
+%% perspective it's a normal exit, whilst from supervisor's
+%% perspective, it's an abnormal exit.
+%%
%% 4) Added an 'intrinsic' restart type. Like the transient type, this
%% type means the child should only be restarted if the child exits
%% abnormally. Unlike the transient type, if the child exits
@@ -529,25 +539,23 @@ restart_child(Pid, Reason, State) ->
{ok, State}
end.
-do_restart({RestartType, Delay}, Reason, Child, State) ->
- case restart1(Child, State) of
- {ok, NState} ->
- {ok, NState};
- {terminate, NState} ->
- _TRef = erlang:send_after(trunc(Delay*1000), self(),
- {delayed_restart,
- {{RestartType, Delay}, Reason, Child}}),
- {ok, state_del_child(Child, NState)}
- end;
+do_restart({permanent = RestartType, Delay}, Reason, Child, State) ->
+ do_restart_delay({RestartType, Delay}, Reason, Child, State);
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
do_restart(Type, normal, Child, State) ->
del_child_and_maybe_shutdown(Type, Child, State);
+do_restart({RestartType, Delay}, {shutdown, restart} = Reason, Child, State)
+ when RestartType =:= transient orelse RestartType =:= intrinsic ->
+ do_restart_delay({RestartType, Delay}, Reason, Child, State);
do_restart(Type, {shutdown, _}, Child, State) ->
del_child_and_maybe_shutdown(Type, Child, State);
do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
del_child_and_maybe_shutdown(Type, Child, State);
+do_restart({RestartType, Delay}, Reason, Child, State)
+ when RestartType =:= transient orelse RestartType =:= intrinsic ->
+ do_restart_delay({RestartType, Delay}, Reason, Child, State);
do_restart(Type, Reason, Child, State) when Type =:= transient orelse
Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
@@ -557,8 +565,21 @@ do_restart(temporary, Reason, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState}.
+do_restart_delay({RestartType, Delay}, Reason, Child, State) ->
+ case restart1(Child, State) of
+ {ok, NState} ->
+ {ok, NState};
+ {terminate, NState} ->
+ _TRef = erlang:send_after(trunc(Delay*1000), self(),
+ {delayed_restart,
+ {{RestartType, Delay}, Reason, Child}}),
+ {ok, state_del_child(Child, NState)}
+ end.
+
del_child_and_maybe_shutdown(intrinsic, Child, State) ->
{shutdown, state_del_child(Child, State)};
+del_child_and_maybe_shutdown({intrinsic, _Delay}, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
del_child_and_maybe_shutdown(_, Child, State) ->
{ok, state_del_child(Child, State)}.
@@ -911,7 +932,8 @@ supname(N,_) -> N.
%%% Func is {Mod, Fun, Args} == {atom, atom, list}
%%% RestartType is permanent | temporary | transient |
%%% intrinsic | {permanent, Delay} |
-%%% {transient, Delay} where Delay >= 0
+%%% {transient, Delay} | {intrinsic, Delay}
+%% where Delay >= 0
%%% Shutdown = integer() | infinity | brutal_kill
%%% ChildType = supervisor | worker
%%% Modules = [atom()] | dynamic
@@ -962,6 +984,7 @@ validRestartType(temporary) -> true;
validRestartType(transient) -> true;
validRestartType(intrinsic) -> true;
validRestartType({permanent, Delay}) -> validDelay(Delay);
+validRestartType({intrinsic, Delay}) -> validDelay(Delay);
validRestartType({transient, Delay}) -> validDelay(Delay);
validRestartType(RestartType) -> throw({invalid_restart_type,
RestartType}).
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 43a6bc99..344196d7 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -69,13 +69,7 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {AddressS, Port} = case inet:sockname(LSock) of
- {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
- {error, _} -> {"unknown", unknown}
- end,
- error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [AddressS, Port, Reason]),
- accept(State);
+ {stop, {accept_failed, Reason}, State};
handle_info(_Info, State) ->
{noreply, State}.