summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarek Majkowski <marek@rabbitmq.com>2011-01-24 12:36:45 +0000
committerMarek Majkowski <marek@rabbitmq.com>2011-01-24 12:36:45 +0000
commita96cfec75431e3574a45ec2eb98ada0b08d01834 (patch)
tree7f2f231d06fed5039445b77b67efe960d9e0f87e
parent7400ad54a3cf0832375a2437abe7ec3635f25c9c (diff)
parentcd9c9debea1972307c0678f80fa7007ea7551a92 (diff)
downloadrabbitmq-server-a96cfec75431e3574a45ec2eb98ada0b08d01834.tar.gz
bug23154 (ipv6) merged into default
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--include/rabbit_backing_queue_spec.hrl15
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_channel.erl78
-rw-r--r--src/rabbit_control.erl32
-rw-r--r--src/rabbit_variable_queue.erl19
6 files changed, 117 insertions, 83 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 2152cab3..bd9fee7d 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1191,6 +1191,16 @@
messages to the channel's consumers.
</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>confirm</term>
+ <listitem><para>True if the channel is in confirm mode, false otherwise.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>messages_unconfirmed</term>
+ <listitem><para>Number of published messages not yet
+ confirmed. On channels not in confirm mode, this
+ remains 0.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>channelinfoitem</command>s are specified then pid,
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 296bfdb3..accb2c0e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -14,14 +14,13 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
--type(fetch_result() ::
+-type(fetch_result(Ack) ::
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(ack_required() :: boolean()).
-type(confirm_required() :: boolean()).
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
@@ -36,13 +35,17 @@
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/3 :: (rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
--spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(),
+-spec(publish_delivered/4 :: (true, rabbit_types:basic_message(),
rabbit_types:message_properties(), state())
- -> {ack(), state()}).
+ -> {ack(), state()};
+ (false, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), state())
+ -> {undefined, state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
--spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
+-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 663977ba..0346ec7d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-record_current_channel_tx(ChPid, Txn) ->
- %% as a side effect this also starts monitoring the channel (if
- %% that wasn't happening already)
- store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
@@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn,
{NeedsConfirming,
State = #q{backing_queue = BQ,
backing_queue_state = BQS}}) ->
- record_current_channel_tx(ChPid, Txn),
+ store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
{true,
NeedsConfirming,
State#q{backing_queue_state =
@@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
true -> {stop, State1};
false -> State2 = case Txn of
none -> State1;
- _ -> rollback_transaction(Txn, ChPid,
+ _ -> rollback_transaction(Txn, C,
State1)
end,
{ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -627,26 +622,23 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(
confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL}) ->
- {AckTags, BQS1} = BQ:tx_commit(Txn,
- fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(TTL),
- BQS),
- %% ChPid must be known here because of the participant management
- %% by the channel.
- C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
+commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
+ {AckTags, BQS1} = BQ:tx_commit(
+ Txn, fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(TTL), BQS),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
-rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+rollback_transaction(Txn, C, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
%% Iff we removed acktags from the channel record on ack+txn then
- %% we would add them back in here (would also require ChPid)
- record_current_channel_tx(ChPid, none),
+ %% we would add them back in here.
+ maybe_store_ch_record(C#cr{txn = none}),
State#q{backing_queue_state = BQS1}.
subtract_acks(A, B) when is_list(B) ->
@@ -848,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) ->
noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
- NewState = commit_transaction(Txn, From, ChPid, State),
- noreply(run_message_queue(NewState));
+ case lookup_ch(ChPid) of
+ not_found -> reply(ok, State);
+ C -> noreply(run_message_queue(
+ commit_transaction(Txn, From, C, State)))
+ end;
handle_call({notify_down, ChPid}, _From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1048,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid},
end;
handle_cast({rollback, Txn, ChPid}, State) ->
- noreply(rollback_transaction(Txn, ChPid, State));
+ noreply(case lookup_ch(ChPid) of
+ not_found -> State;
+ C -> rollback_transaction(Txn, C, State)
+ end);
handle_cast(delete_immediately, State) ->
{stop, normal, State};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b92206ad..91559ea6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -41,8 +41,10 @@
-define(STATISTICS_KEYS,
[pid,
transactional,
+ confirm,
consumer_count,
messages_unacknowledged,
+ messages_unconfirmed,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -280,12 +282,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
%% TODO: this does a complete scan and partial rebuild of the
%% tree, which is quite efficient. To do better we'd need to
%% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MsgSeqNos, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}),
+ {MXs, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}, State),
erase_queue_stats(QPid),
- noreply(queue_blocked(QPid, record_confirms(MsgSeqNos,
- State#ch{unconfirmed = UC1}))).
+ noreply(
+ queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))).
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -471,38 +473,42 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc) ->
+remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
Acc;
-remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+ remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
+ State).
-record_confirm(undefined, State) -> State;
-record_confirm(MsgSeqNo, State) -> record_confirms([MsgSeqNo], State).
+record_confirm(undefined, _, State) ->
+ State;
+record_confirm(MsgSeqNo, XName, State) ->
+ record_confirms([{MsgSeqNo, XName}], State).
record_confirms([], State) ->
State;
-record_confirms(MsgSeqNos, State = #ch{confirmed = C}) ->
- State#ch{confirmed = [MsgSeqNos | C]}.
+record_confirms(MXs, State = #ch{confirmed = C}) ->
+ State#ch{confirmed = [MXs | C]}.
confirm([], _QPid, State) ->
State;
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {DoneMessages, UC2} =
+ {MXs, UC1} =
lists:foldl(
fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UC0) of
none -> Acc;
- {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
+ {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
end
end, {[], UC}, MsgSeqNos),
- record_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
-remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
Qs1 = sets:del_element(QPid, Qs),
+ maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -555,7 +561,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1222,20 +1228,20 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_route),
- record_confirm(MsgSeqNo, State);
-process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
- ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, [], MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, State);
-process_routing_result(routed, _, undefined, _, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirm(MsgSeqNo, XName, State);
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
#ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
@@ -1244,8 +1250,11 @@ lock_message(false, _MsgStruct, State) ->
State.
send_confirms(State = #ch{confirmed = C}) ->
- send_confirms(lists:append(C), State #ch{confirmed = []}).
-
+ C1 = lists:append(C),
+ MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+ MsgSeqNo
+ end || {MsgSeqNo, ExchangeName} <- C1 ],
+ send_confirms(MsgSeqNos, State #ch{confirmed = []}).
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1255,7 +1264,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
- false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
case Ms of
@@ -1283,8 +1292,11 @@ i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ gb_trees:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
uncommitted_ack_q = UAQ}) ->
queue:len(UAMQ) + queue:len(UAQ);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 55345a38..80483097 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -77,24 +77,24 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- halt();
+ quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
print_error("invalid command '~s'",
[string:join([atom_to_list(Command) | Args], " ")]),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, {'EXIT', Reason}} ->
print_error("~p", [Reason]),
- halt(2);
+ quit(2);
{badrpc, Reason} ->
print_error("unable to connect to node ~w: ~w", [Node, Reason]),
print_badrpc_diagnostics(Node),
- halt(2);
+ quit(2);
Other ->
print_error("~p", [Other]),
- halt(2)
+ quit(2)
end.
fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
@@ -130,7 +130,7 @@ stop() ->
usage() ->
io:format("~s", [rabbit_ctl_usage:usage()]),
- halt(1).
+ quit(1).
action(stop, Node, [], _Opts, Inform) ->
Inform("Stopping and halting node ~p", [Node]),
@@ -273,10 +273,13 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
- display_info_list(
- [lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
- InfoKeys);
+ case rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]) of
+ L when is_list(L) -> display_info_list(
+ [lists:zip(InfoKeys, tuple_to_list(X)) ||
+ X <- L],
+ InfoKeys);
+ Other -> Other
+ end;
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
@@ -390,3 +393,12 @@ prettify_typed_amqp_value(Type, Value) ->
array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
_ -> Value
end.
+
+% the slower shutdown on windows required to flush stdout
+quit(Status) ->
+ case os:type() of
+ {unix, _} ->
+ halt(Status);
+ {win32, _} ->
+ init:stop(Status)
+ end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f39bc964..7142d560 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -281,12 +281,11 @@
-record(sync, { acks_persistent, acks_all, pubs, funs }).
%% When we discover, on publish, that we should write some indices to
-%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of
-%% betas that we must be due to write indices for before we do any
-%% work at all. This is both a minimum and a maximum - we don't write
-%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't
-%% write more - we can always come back on the next publish to do
-%% more.
+%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
+%% that we must be due to write indices for before we do any work at
+%% all. This is both a minimum and a maximum - we don't write fewer
+%% than IO_BATCH_SIZE indices out in one go, and we don't write more -
+%% we can always come back on the next publish to do more.
-define(IO_BATCH_SIZE, 64).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
@@ -299,7 +298,7 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id() | 'blank_ack').
+-type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -509,7 +508,7 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, #basic_message { guid = Guid },
_MsgProps, State = #vqstate { len = 0 }) ->
blind_confirm(self(), gb_sets:singleton(Guid)),
- {blank_ack, a(State)};
+ {undefined, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
MsgProps = #message_properties {
@@ -628,7 +627,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
MsgStatus #msg_status {
is_delivered = true }, State),
{SeqId, StateN};
- false -> {blank_ack, State}
+ false -> {undefined, State}
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
@@ -897,7 +896,7 @@ cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
gb_sets_maybe_insert(false, _Val, Set) -> Set;
-%% when requeueing, we re-add a guid to the unconfimred set
+%% when requeueing, we re-add a guid to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },