From a203ea9faf929414cb5ebe314d5358109e84830a Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Wed, 29 Sep 2010 13:02:28 +0100 Subject: add confirm related info keys Added the following info_keys: - confirm :: The type of the channel. Is one of none, single, multiple. - unconfirmed :: The number of unconfirmed messages. --- src/rabbit_channel.erl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4bb1f13b..b558a0cc 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -57,8 +57,10 @@ -define(STATISTICS_KEYS, [pid, transactional, + confirm, consumer_count, messages_unacknowledged, + unconfirmed, acks_uncommitted, prefetch_count]). @@ -1288,8 +1290,16 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{username = Username}) -> Username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE, + confirm_multiple = CM}) -> case {CE, CM} of + {false, _} -> none; + {_, false} -> single; + {_, true} -> multiple + end; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); +i(unconfirmed, #ch{need_confirming = NC}) -> + gb_sets:size(NC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> queue:len(UAMQ) + queue:len(UAQ); -- cgit v1.2.1 From 2a25a2be165e457c21d8ac085c1b08ca721a13ed Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Thu, 30 Sep 2010 09:31:11 +0100 Subject: added fine grained stats Channel now emits the number of publisher acks (confirms) sent to the publisher. If this number is less than the number of publishes, it means that the broker isn't coping with the amount of incoming messages. --- src/rabbit_channel.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b558a0cc..55d2a0b4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -464,6 +464,7 @@ send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> + maybe_incr_stats([{channel_stats, 1}], confirm, State), do_if_not_dup(MsgSeqNo, State, fun(MSN, S = #ch{writer_pid = WriterPid, qpid_to_msgs = QTM}) -> @@ -475,6 +476,7 @@ send_or_enqueue_ack(MsgSeqNo, end, QTM) } end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> + maybe_incr_stats([{channel_stats, 1}], confirm, State), do_if_not_dup(MsgSeqNo, State, fun(MSN, S = #ch{qpid_to_msgs = QTM}) -> State1 = start_ack_timer(S), @@ -1322,6 +1324,8 @@ incr_stats({QPid, _} = QX, Inc, Measure) -> incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(channel_stats, Inc, Measure) -> + update_measures(channel_stats, self(), Inc, Measure); incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). @@ -1355,6 +1359,8 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, {channel_exchange_stats, [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {channel_channel_stats, + [Stats || {{channel_stats, _}, Stats} <- get()]}, {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], -- cgit v1.2.1 From 61398037deb3e03dce203ff05f9fab3c6e05ae52 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Sun, 31 Oct 2010 13:25:42 +0000 Subject: update exchange and queue_exchange stats on confirms --- src/rabbit_channel.erl | 82 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 30 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 08fcd768..697e3d7f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -50,7 +50,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, confirm_enabled, published_count, confirm_multiple, confirm_tref, - held_confirms, unconfirmed, queues_for_msg}). + held_confirms, unconfirmed, queues_for_msg, exchange_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -197,6 +197,7 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, confirm_multiple = false, held_confirms = gb_sets:new(), unconfirmed = gb_sets:new(), + exchange_for_msg = dict:new(), queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, @@ -288,16 +289,19 @@ handle_cast(flush_multiple_acks, {noreply, State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined}}; -handle_cast({confirm, MsgSeqNo, From}, State) -> - {noreply, send_or_enqueue_ack(MsgSeqNo, From, State)}. +handle_cast({confirm, MsgSeqNo, From}, + State = #ch{exchange_for_msg = EFM}) -> + {ok, ExchangeName} = dict:find(MsgSeqNo, EFM), + {noreply, send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> + State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> State1 = dict:fold( fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> Qs = sets:del_element(QPid, QPids), case sets:size(Qs) of - 0 -> send_or_enqueue_ack(Msg, QPid, State0); + 0 -> {ok, ExchangeName} = dict:find(Msg, EFM), + send_or_enqueue_ack(Msg, QPid, ExchangeName, State0); _ -> State0#ch{queues_for_msg = dict:store(Msg, Qs, QFM0)} end @@ -462,12 +466,13 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -send_or_enqueue_ack(undefined, _QPid, State) -> +send_or_enqueue_ack(undefined, _QPid, _EN, State) -> State; -send_or_enqueue_ack(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> +send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) -> State; -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> - maybe_incr_stats([{channel_stats, 1}], confirm, State), +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = false}) -> + maybe_incr_confirm_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> @@ -475,17 +480,26 @@ send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> WriterPid, #'basic.ack'{delivery_tag = MSN}), State1 end, State); -send_or_enqueue_ack(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> - maybe_incr_stats([{channel_stats, 1}], confirm, State), +send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, + State = #ch{confirm_multiple = true}) -> + maybe_incr_confirm_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> start_ack_timer(State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). +maybe_incr_confirm_stats(QPid, ExchangeName, State) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State), + case QPid of + undefined -> ok; + _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State) + end. + do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, - State = #ch{unconfirmed = UC, - queues_for_msg = QFM}) -> + State = #ch{unconfirmed = UC, + queues_for_msg = QFM, + exchange_for_msg = EFM}) -> %% clears references to MsgSeqNo and does ConfirmFun case gb_sets:is_element(MsgSeqNo, UC) of true -> @@ -502,6 +516,8 @@ do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, State#ch{ queues_for_msg = dict:erase(MsgSeqNo, QFM), + exchange_for_msg = + dict:erase(MsgSeqNo, EFM), unconfirmed = gb_sets:delete(MsgSeqNo, UC)}); _ -> State#ch{queues_for_msg = @@ -557,6 +573,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, + io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -565,7 +582,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> MsgSeqNo; false -> undefined end)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, + IsPersistent, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1249,23 +1267,31 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> +process_routing_result(unroutable, _QPids, ExchangeName, _Persistent, MsgSeqNo, + Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(not_delivered, _QPids, ExchangeName, _Persistent, MsgSeqNo, + Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, [], _, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, _, _, undefined, _, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, [], ExchangeName, _Persistent, MsgSeqNo, + _Msg, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, _QPids, _EN, _Persistent, undefined, + _Msg, State) -> State; -process_routing_result(routed, _, false, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, QPids, true, MsgSeqNo, _, - State = #ch{queues_for_msg = QFM}) -> +process_routing_result(routed, _QPids, ExchangeName, false, MsgSeqNo, + _Msg, State) -> + send_or_enqueue_ack(MsgSeqNo, undefined, ExchangeName, State); +process_routing_result(routed, QPids, ExchangeName, true, MsgSeqNo, + _Msg, State = #ch{queues_for_msg = QFM, + exchange_for_msg = EFM}) -> + EFM1 = dict:store(MsgSeqNo, ExchangeName, EFM), + io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, ExchangeName]), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = QFM1}. + State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1335,8 +1361,6 @@ incr_stats({QPid, _} = QX, Inc, Measure) -> incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> maybe_monitor(QPid), update_measures(queue_stats, QPid, Inc, Measure); -incr_stats(channel_stats, Inc, Measure) -> - update_measures(channel_stats, self(), Inc, Measure); incr_stats(X, Inc, Measure) -> update_measures(exchange_stats, X, Inc, Measure). @@ -1370,8 +1394,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}) -> [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, {channel_exchange_stats, [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, - {channel_channel_stats, - [Stats || {{channel_stats, _}, Stats} <- get()]}, {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], -- cgit v1.2.1 From 99199ea02573ce25d1f058eef75f4919eb7ddb4d Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 24 Nov 2010 11:56:46 +0000 Subject: Slower rabbitmqctl shutdown on windows flushes output --- src/rabbit_control.erl | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 72b77b1f..ae10712e 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -91,24 +91,24 @@ start() -> true -> ok; false -> io:format("...done.~n") end, - halt(); + quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> print_error("invalid command '~s'", [string:join([atom_to_list(Command) | Args], " ")]), usage(); {error, Reason} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), - halt(2); + quit(2); {badrpc, Reason} -> print_error("unable to connect to node ~w: ~w", [Node, Reason]), print_badrpc_diagnostics(Node), - halt(2); + quit(2); Other -> print_error("~p", [Other]), - halt(2) + quit(2) end. fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). @@ -140,7 +140,7 @@ stop() -> usage() -> io:format("~s", [rabbit_ctl_usage:usage()]), - halt(1). + quit(1). action(stop, Node, [], _Opts, Inform) -> Inform("Stopping and halting node ~p", [Node]), @@ -395,3 +395,11 @@ prettify_typed_amqp_value(Type, Value) -> array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; _ -> Value end. + +quit(Status) -> + case os:type() of + {unix, _} -> + halt(Status); + {win32, _} -> + init:stop(Status) + end. -- cgit v1.2.1 From 5f7240e024f5566f8e62beef4a78f0cdfdc34990 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 24 Nov 2010 12:33:50 +0000 Subject: Add helpful comment about windows flushing to rabbitmqctl --- src/rabbit_control.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ae10712e..f05aaedf 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -396,6 +396,7 @@ prettify_typed_amqp_value(Type, Value) -> _ -> Value end. +% the slower shutdown on windows required to flush stdout quit(Status) -> case os:type() of {unix, _} -> -- cgit v1.2.1 From c8d85b08d02b6c96419fff88eb5ea77bd0f8f3bc Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Mon, 6 Dec 2010 10:48:30 +0000 Subject: add stats for confirms To recap, a published message is confirmed by the channel. A message is confirmed only after all the queues it was published to confirm it. With the current change, the emitted stats look like this: {channel_exchange_stats, [{{resource,<<"/">>,exchange,<<"direct">>}, [{confirm,545},{publish,545}]}]}, {channel_queue_exchange_stats, [{{<0.204.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}, {{<0.195.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}]}] The confirm field in channel_exchange_stats represents the number of messages sent to that exchange that have also been confirmed. If the exchanged routed the message to different queues, this number is only increased when all queues have confirmed the message. If the message was unroutable or was routed to 0 queues, this number is still increased. This is the number of basic.confirms sent back to publisher. The confirm field in channel_queue_exchange_stats represents the number of messages confirmed by that queue (but not necessarily confirmed by the channel). In channel_exchange_stats, if the number of confirms lags behind the number of publishes, one of the queues is not confirming messages in a timely fashion. --- src/rabbit_channel.erl | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef85c318..e0f6f0e2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -467,29 +467,30 @@ send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) State; send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = false}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = MSN}), + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), State1 end, State); send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = true}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), start_confirm_timer( State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). -maybe_incr_confirm_stats(QPid, ExchangeName, State) -> - maybe_incr_stats([{ExchangeName, 1}], confirm, State), +maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) -> case QPid of undefined -> ok; - _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State) + _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State) end. do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -568,7 +569,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, - io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -579,7 +579,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), - io:format("did~n"), {noreply, case TxnKey of none -> State2; _ -> add_tx_participants(DeliveredQPids, State2) @@ -1256,7 +1255,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> EFM1 = dict:store(MsgSeqNo, XName, EFM), - io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. @@ -1368,6 +1366,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], + io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. -- cgit v1.2.1 From 53c5b3e08d7a21d6ca81bc6048476ffcf508618d Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Mon, 6 Dec 2010 11:55:16 +0000 Subject: add missing case --- src/rabbit_channel.erl | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e0f6f0e2..aefcd4f0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -289,8 +289,12 @@ handle_cast(flush_multiple_acks, State) -> handle_cast({confirm, MsgSeqNo, From}, State = #ch{exchange_for_msg = EFM}) -> - {ok, ExchangeName} = dict:find(MsgSeqNo, EFM), - {noreply, send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State)}. + State1 = case dict:find(MsgSeqNo, EFM) of + {ok, ExchangeName} -> + send_or_enqueue_ack(MsgSeqNo, From, ExchangeName, State); + _ -> State %% no entry in EFM means it's already been confirmed + end, + {noreply, State1}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> @@ -1366,7 +1370,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], - io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. -- cgit v1.2.1 From 4b1537735bf523649db72c42288ff43fe7939b57 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 8 Dec 2010 10:48:57 +0000 Subject: Cosmetic --- src/rabbit_channel.erl | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 11342c60..9f1f2974 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1294,12 +1294,9 @@ i(number, #ch{channel = Channel}) -> Channel; i(user, #ch{username = Username}) -> Username; i(vhost, #ch{virtual_host = VHost}) -> VHost; i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; -i(confirm, #ch{confirm_enabled = CE, - confirm_multiple = CM}) -> case {CE, CM} of - {false, _} -> none; - {_, false} -> single; - {_, true} -> multiple - end; +i(confirm, #ch{confirm_enabled = false}) -> none; +i(confirm, #ch{confirm_multiple = false}) -> single; +i(confirm, _) -> multiple; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(unconfirmed, #ch{unconfirmed = UC}) -> -- cgit v1.2.1 From 811092a488075ca9d5e35bcc5c208d57acbc1d2b Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Thu, 9 Dec 2010 08:24:52 +0000 Subject: add stats again; update docs --- docs/rabbitmqctl.1.xml | 12 ++++++++++++ src/rabbit_channel.erl | 2 ++ 2 files changed, 14 insertions(+) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6b02abe4..b02ec613 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1134,6 +1134,18 @@ messages to the channel's consumers. + + confirm + Confirm mode for the channel. Either + none, single or + multiple. + + + unconfirmed + Number of published messages not yet + confirmed. On channels not in confirm mode, this + remains 0. + If no channelinfoitems are specified then pid, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9f1f2974..7471115e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -474,6 +474,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{ delivery_tag = MSN}), @@ -484,6 +485,7 @@ send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed(MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), start_confirm_timer( State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). -- cgit v1.2.1 From f5ef17aac0a739db3519ade1f9a71bdc0e42bdc8 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 14 Jan 2011 10:15:47 +0000 Subject: for queues, log partial confirms rather than full confirms --- src/rabbit_channel.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5bc04170..8c91e717 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -524,7 +524,7 @@ confirm(MsgSeqNos, QPid, XName, State) -> end end end, {[], State}, MsgSeqNos), - maybe_incr_stats([{{QPid, XName}, length(DoneMessages)}], confirm, State), + maybe_incr_stats([{{QPid, XName}, length(MsgSeqNos)}], confirm, State), send_confirms(DoneMessages, XName, State1). group_confirms_by_exchange([], Acc) -> @@ -1271,7 +1271,7 @@ send_confirms([], _, State) -> send_confirms([MsgSeqNo], XName, State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), - maybe_incr_confirm_exchange_stats([MsgSeqNo], XName, State); + maybe_incr_confirm_exchange_stats_and_cleanup([MsgSeqNo], XName, State); send_confirms(Cs, XName, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), @@ -1287,9 +1287,9 @@ send_confirms(Cs, XName, multiple = true}) end, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], - maybe_incr_confirm_exchange_stats(Cs, XName, State). + maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State). -maybe_incr_confirm_exchange_stats(Cs, XName, State) -> +maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State) -> maybe_incr_stats([{XName, length(Cs)}], confirm, State), lists:foldl(fun(MsgSeqNo, State0 = #ch{exchange_for_msg = EFM}) -> State0#ch{exchange_for_msg = -- cgit v1.2.1 From 10b506a3a574638c5a177456df22a951f0981513 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 14 Jan 2011 10:53:01 +0000 Subject: don't do confirm stats book-keeping unless stats are enabled --- src/rabbit_channel.erl | 66 +++++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8c91e717..6d10e7ea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -287,33 +287,33 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, hibernate}; -%% Add this case for when stats are disabled -%% handle_cast({confirm, MsgSeqNos, From}, State) -> -%% {noreply, confirm(MsgSeqNos, From, State)}. handle_cast({confirm, MsgSeqNos, From}, - State = #ch{exchange_for_msg = EFM}) -> - EMs = - lists:foldl( - fun(MsgSeqNo, EMs) -> - case dict:find(MsgSeqNo, EFM) of - {ok, XName} -> - [{XName, MsgSeqNo} | EMs]; - _ -> - EMs - end - end, [], MsgSeqNos), - State1 = - case lists:usort(EMs) of - [{XName, MsgSeqNo} | EMs1] -> + State= #ch{exchange_for_msg = EFM, stats_timer = StatsTimer}) -> + case rabbit_event:stats_level(StatsTimer) of + fine -> + EMs = lists:foldl( - fun({ExchangeName, MsgSeqNosE}, State0) -> - confirm(MsgSeqNosE, From, ExchangeName, State0) - end, State, - group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); - [] -> - State - end, - {noreply, State1}. + fun(MsgSeqNo, EMs) -> + case dict:find(MsgSeqNo, EFM) of + {ok, XName} -> [{XName, MsgSeqNo} | EMs]; + _ -> EMs + end + end, [], MsgSeqNos), + {noreply, + case lists:usort(EMs) of + [{XName, MsgSeqNo} | EMs1] -> + lists:foldl( + fun({XName1, MsgSeqNosE}, State0) -> + confirm(MsgSeqNosE, From, XName1, State0) + end, State, + group_confirms_by_exchange(EMs1, + [{XName, [MsgSeqNo]}])); + [] -> + State + end}; + _ -> + {noreply, confirm(MsgSeqNos, From, undefined, State)} + end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> @@ -1242,11 +1242,11 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, XName, MsgSeqNo, _, State) -> - ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_route), +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_route), send_confirms([MsgSeqNo], XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, _, State) -> - ok = basic_return(MsgSeqNo, State#ch.writer_pid, no_consumers), +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> + ok = basic_return(Msg, State#ch.writer_pid, no_consumers), send_confirms([MsgSeqNo], XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> send_confirms([MsgSeqNo], XName, State); @@ -1258,7 +1258,11 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> [maybe_monitor(QPid) || QPid <- QPids], State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), - exchange_for_msg = dict:store(MsgSeqNo, XName, EFM), + exchange_for_msg = case XName of + undefined -> EFM; + _ -> dict:store(MsgSeqNo, + XName, EFM) + end, unconfirmed = gb_sets:add(MsgSeqNo, UC)}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1289,6 +1293,8 @@ send_confirms(Cs, XName, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State). +maybe_incr_confirm_exchange_stats_and_cleanup(_, undefined, State) -> + State; maybe_incr_confirm_exchange_stats_and_cleanup(Cs, XName, State) -> maybe_incr_stats([{XName, length(Cs)}], confirm, State), lists:foldl(fun(MsgSeqNo, State0 = #ch{exchange_for_msg = EFM}) -> -- cgit v1.2.1 From d1e90b5374460a58581fb3886822ea95714a1cc5 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 14 Jan 2011 12:16:37 +0000 Subject: keep track of de-duplicated confirms --- src/rabbit_channel.erl | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6d10e7ea..45e21a5a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -504,27 +504,29 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> confirm([], _, _, State) -> State; confirm(MsgSeqNos, QPid, XName, State) -> - {DoneMessages, State1} = + {DoneMessages, UniqueSeqNos, State1} = lists:foldl( - fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, - queues_for_msg = QFM0}}) -> + fun(MsgSeqNo, {DMs, USN, State0 = #ch{unconfirmed = UC0, + queues_for_msg = QFM0}}) -> case gb_sets:is_element(MsgSeqNo, UC0) of - false -> {DMs, State0}; + false -> {DMs, USN, State0}; true -> Qs1 = sets:del_element( QPid, dict:fetch(MsgSeqNo, QFM0)), case sets:size(Qs1) of 0 -> {[MsgSeqNo | DMs], + USN + 1, State0#ch{ queues_for_msg = dict:erase(MsgSeqNo, QFM0), unconfirmed = gb_sets:delete(MsgSeqNo, UC0)}}; _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), - {DMs, State0#ch{queues_for_msg = QFM1}} + {DMs, USN + 1, + State0#ch{queues_for_msg = QFM1}} end end - end, {[], State}, MsgSeqNos), - maybe_incr_stats([{{QPid, XName}, length(MsgSeqNos)}], confirm, State), + end, {[], 0, State}, MsgSeqNos), + maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), send_confirms(DoneMessages, XName, State1). group_confirms_by_exchange([], Acc) -> -- cgit v1.2.1 From 2c56b0e6f768017b3ead54217a19cc3e158e141c Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 14 Jan 2011 12:18:11 +0000 Subject: cosmetic --- src/rabbit_channel.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 45e21a5a..28980b70 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1244,17 +1244,17 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State#ch.writer_pid, no_route), send_confirms([MsgSeqNo], XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State#ch.writer_pid, no_consumers), send_confirms([MsgSeqNo], XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> send_confirms([MsgSeqNo], XName, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{queues_for_msg = QFM, unconfirmed = UC, exchange_for_msg = EFM} = State, [maybe_monitor(QPid) || QPid <- QPids], -- cgit v1.2.1 From 0645b79f2db3d705513eda532a5923350452ba27 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Tue, 18 Jan 2011 10:59:31 +0000 Subject: stats for confirms work again --- src/rabbit_channel.erl | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f2b74dd1..083a1313 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -301,12 +301,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {[], UC}), State1 = case lists:usort(EMs) of [] -> State; - [{XName, [MsgSeqNo]} | EMs1] -> + [{XName, MsgSeqNo} | EMs1] -> EMs2 = group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}]), - lists:fold(fun({XName1, MsgSeqNos}, State0) -> - send_confirms(MsgSeqNos, XName1, State0) - end, State#ch{unconfirmed = UC1}, EMs2) + lists:foldl(fun({XName1, MsgSeqNos}, State0) -> + send_confirms(MsgSeqNos, XName1, State0) + end, State#ch{unconfirmed = UC1}, EMs2) end, erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -494,7 +494,7 @@ group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> EMs = lists:foldl( fun(MsgSeqNo, EMs) -> case gb_trees:lookup(MsgSeqNo, UC) of - {value, {_, XName}} -> [{MsgSeqNo, XName} | EMs]; + {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs]; none -> EMs end end, [], MsgSeqNos), @@ -519,23 +519,25 @@ group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> confirm([], _QPid, _XName, State) -> State; confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) -> - {{DoneMessages, UC1}, UniqueSeqNos} = + {{EMs, UC1}, UniqueSeqNos} = lists:foldl( fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) -> case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; + none -> {Acc, USN}; {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc), USN + 1} end end, {{[], UC}, 0}, MsgSeqNos), + DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs], maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MsgSeqNos, UC}) -> +remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) -> + %% remove QPid from MsgSeqNo's mapping Qs1 = sets:del_element(QPid, Qs), case sets:size(Qs1) of - 0 -> {[{XName, MsgSeqNo} | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} + 0 -> {[{XName, MsgSeqNo} | XMs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {XMs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1272,7 +1274,8 @@ send_confirms([], _, State) -> send_confirms([MsgSeqNo], XName, State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), - maybe_incr_stats([{XName, 1}], confirm, State); + maybe_incr_stats([{XName, 1}], confirm, State), + State; send_confirms(Cs, XName, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), @@ -1288,7 +1291,8 @@ send_confirms(Cs, XName, multiple = true}) end, [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], - maybe_incr_stats([{XName, length(Cs)}], confirm, State). + maybe_incr_stats([{XName, length(Cs)}], confirm, State), + State. send_confirm(undefined, _WriterPid) -> ok; @@ -1376,6 +1380,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], + io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. -- cgit v1.2.1 From 83ebec1bcf772c48c786e354a0c03db16d664706 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Tue, 18 Jan 2011 11:51:55 +0000 Subject: merge duplicate code --- src/rabbit_channel.erl | 57 +++++++++++++++++++++----------------------------- 1 file changed, 24 insertions(+), 33 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 083a1313..90fe230d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -288,7 +288,7 @@ handle_cast({confirm, MsgSeqNos, From}, State= #ch{stats_timer = StatsTimer}) -> case rabbit_event:stats_level(StatsTimer) of fine -> {noreply, group_and_confirm(MsgSeqNos, From, State)}; - _ -> {noreply, confirm(MsgSeqNos, From, undefined, State)} + _ -> {noreply, nogroup_confirm(MsgSeqNos, From, State)} end. handle_info({'DOWN', _MRef, process, QPid, _Reason}, @@ -299,15 +299,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, {EMs, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}), - State1 = case lists:usort(EMs) of - [] -> State; - [{XName, MsgSeqNo} | EMs1] -> - EMs2 = group_confirms_by_exchange(EMs1, - [{XName, [MsgSeqNo]}]), - lists:foldl(fun({XName1, MsgSeqNos}, State0) -> - send_confirms(MsgSeqNos, XName1, State0) - end, State#ch{unconfirmed = UC1}, EMs2) - end, + State1 = confirm_grouped(EMs, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -490,19 +482,17 @@ remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc) -> group_and_confirm([], _QPid, State) -> State; -group_and_confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - EMs = lists:foldl( - fun(MsgSeqNo, EMs) -> - case gb_trees:lookup(MsgSeqNo, UC) of - {value, {_, XName}} -> [{XName, MsgSeqNo} | EMs]; - none -> EMs - end - end, [], MsgSeqNos), +group_and_confirm(MsgSeqNos, QPid, State) -> + {EMs, UC1} = + take_from_unconfirmed(MsgSeqNos, QPid, State), + confirm_grouped(EMs, State#ch{unconfirmed=UC1}). + +confirm_grouped(EMs, State) -> case lists:usort(EMs) of [{XName, MsgSeqNo} | EMs1] -> lists:foldl( fun({XName1, MsgSeqNosE}, State0) -> - confirm(MsgSeqNosE, QPid, XName1, State0) + send_confirms(MsgSeqNosE, XName1, State0) end, State, group_confirms_by_exchange(EMs1, [{XName, [MsgSeqNo]}])); [] -> @@ -516,21 +506,23 @@ group_confirms_by_exchange([{E, Msg1} | EMs], [{E, Msgs} | Acc]) -> group_confirms_by_exchange([{E, Msg1} | EMs], Acc) -> group_confirms_by_exchange(EMs, [{E, [Msg1]} | Acc]). -confirm([], _QPid, _XName, State) -> +nogroup_confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, XName, State = #ch{unconfirmed = UC}) -> - {{EMs, UC1}, UniqueSeqNos} = - lists:foldl( - fun(MsgSeqNo, {{_DMs, UC0} = Acc, USN}) -> - case gb_trees:lookup(MsgSeqNo, UC0) of - none -> {Acc, USN}; - {value, Qs} -> {remove_qmsg(MsgSeqNo, QPid, Qs, Acc), - USN + 1} - end - end, {{[], UC}, 0}, MsgSeqNos), +nogroup_confirm(MsgSeqNos, QPid, State) -> + {EMs, UC1} = take_from_unconfirmed(MsgSeqNos, QPid, State), DoneMessages = [MsgSeqNo || {_XName, MsgSeqNo} <- EMs], - maybe_incr_stats([{{QPid, XName}, UniqueSeqNos}], confirm, State), - send_confirms(DoneMessages, XName, State#ch{unconfirmed = UC1}). + send_confirms(DoneMessages, undefined, State#ch{unconfirmed = UC1}). + +take_from_unconfirmed(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + lists:foldl( + fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> Acc; + {value, {_, XName} = QX} -> + maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), + remove_qmsg(MsgSeqNo, QPid, QX, Acc) + end + end, {[], UC}, MsgSeqNos). remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {XMs, UC}) -> %% remove QPid from MsgSeqNo's mapping @@ -1380,7 +1372,6 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], - io:format("~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. -- cgit v1.2.1 From 1169209a828469e61618278c83928bf989c020e8 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Wed, 19 Jan 2011 10:52:53 +0000 Subject: fix break when stats were off --- src/rabbit_channel.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca424335..6bbb0412 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1265,15 +1265,15 @@ lock_message(false, _MsgStruct, State) -> State. send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> + C1 = lists:append(C), MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of - fine -> incr_confirm_exchange_stats(C, State); - _ -> [MsgSeqNo || {MsgSeqNo, _} <- C] + fine -> incr_confirm_exchange_stats(C1, State); + _ -> [MsgSeqNo || {MsgSeqNo, _} <- C1] end, send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> State; -send_confirms([MsgSeqNo], - State = #ch{writer_pid = WriterPid}) -> +send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> send_confirm(MsgSeqNo, WriterPid), State; send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> @@ -1297,7 +1297,7 @@ incr_confirm_exchange_stats(C, State) -> fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) -> maybe_incr_stats([{ExchangeName, 1}], confirm, State), [MsgSeqNo | MsgSeqNos0] - end, [], lists:append(C)). + end, [], C). send_confirm(SeqNo, WriterPid) -> ok = rabbit_writer:send_command(WriterPid, -- cgit v1.2.1 From d1d2c8e8ee89cccdbe58f5365397829fa0b82c98 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Wed, 19 Jan 2011 15:23:05 +0000 Subject: swap tuple elements for readability; inline a function --- src/rabbit_channel.erl | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6bbb0412..ca8571eb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -488,11 +488,11 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _QX, Acc, _State) -> +remove_queue_unconfirmed(none, _XQ, Acc, _State) -> Acc; -remove_queue_unconfirmed({MsgSeqNo, QX, Next}, QPid, Acc, State) -> +remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, - remove_qmsg(MsgSeqNo, QPid, QX, Acc, State), + remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State), State). record_confirm(undefined, _, State) -> @@ -512,18 +512,18 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> lists:foldl( fun(MsgSeqNo, {_DMs, UC0} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of - none -> Acc; - {value,QX} -> remove_qmsg(MsgSeqNo, QPid, QX, Acc, State) + none -> Acc; + {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end end, {[], UC}, MsgSeqNos), record_confirms(MEs, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, {Qs, XName}, {MEs, UC}, State) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MEs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of 0 -> {[{MsgSeqNo, XName} | MEs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MEs, gb_trees:update(MsgSeqNo, {Qs1, XName}, UC)} + _ -> {MEs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1256,7 +1256,7 @@ process_routing_result(routed, _, _, undefined, _, State) -> process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - UC1 = gb_trees:insert(MsgSeqNo, {sets:from_list(QPids), XName}, UC), + UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC), State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> @@ -1267,8 +1267,15 @@ lock_message(false, _MsgStruct, State) -> send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> C1 = lists:append(C), MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of - fine -> incr_confirm_exchange_stats(C1, State); - _ -> [MsgSeqNo || {MsgSeqNo, _} <- C1] + fine -> + lists:foldl( + fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) -> + maybe_incr_stats([{ExchangeName, 1}], + confirm, State), + [MsgSeqNo | MsgSeqNos0] + end, [], C1); + _ -> + [MsgSeqNo || {MsgSeqNo, _} <- C1] end, send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> @@ -1280,7 +1287,7 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo + false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of @@ -1292,13 +1299,6 @@ send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], State. -incr_confirm_exchange_stats(C, State) -> - lists:foldl( - fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) -> - maybe_incr_stats([{ExchangeName, 1}], confirm, State), - [MsgSeqNo | MsgSeqNos0] - end, [], C). - send_confirm(SeqNo, WriterPid) -> ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = SeqNo}). -- cgit v1.2.1 From bd75b2bede862225dc8f702bc79b969da442c48d Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Wed, 19 Jan 2011 15:36:57 +0000 Subject: comprehensions are more concise --- src/rabbit_channel.erl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca8571eb..eab20469 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1268,12 +1268,10 @@ send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> C1 = lists:append(C), MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of fine -> - lists:foldl( - fun({MsgSeqNo, ExchangeName}, MsgSeqNos0) -> - maybe_incr_stats([{ExchangeName, 1}], - confirm, State), - [MsgSeqNo | MsgSeqNos0] - end, [], C1); + [ begin maybe_incr_stats([{ExchangeName, 1}], + confirm, State), + MsgSeqNo + end || {MsgSeqNo, ExchangeName} <- C1]; _ -> [MsgSeqNo || {MsgSeqNo, _} <- C1] end, -- cgit v1.2.1 From e2645b89e948168c3c2d9595ec7f7097ad0fb3d0 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 20 Jan 2011 16:01:05 +0000 Subject: Deal with the possibility of a ch DOWN overtaking other messages from the channel --- src/rabbit_amqqueue_process.erl | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 663977ba..48192dcb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -630,24 +630,36 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), - BQS), %% ChPid must be known here because of the participant management - %% by the channel. - C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1}. + %% by the channel. However, in a cluster, the DOWN can overtake + %% the commit, and so there is a case where handle_ch_down has + %% already been called for ChPid. + case lookup_ch(ChPid) of + not_found -> + gen_server2:reply(From, ok), + State; + C = #cr{acktags = ChAckTags} -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1} + end. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here (would also require ChPid) - record_current_channel_tx(ChPid, none), - State#q{backing_queue_state = BQS1}. + case lookup_ch(ChPid) of + not_found -> + State; + #cr{} -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on + %% ack+txn then we would add them back in here (would also + %% require ChPid) + record_current_channel_tx(ChPid, none), + State#q{backing_queue_state = BQS1} + end. subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -- cgit v1.2.1 From 1b384986f1a9bb660324077f63e8720f7dfffd40 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 20 Jan 2011 16:47:23 +0000 Subject: handle errors in list_consumers --- src/rabbit_control.erl | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 78391be2..4228ff7f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -273,10 +273,13 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required], - display_info_list( - [lists:zip(InfoKeys, tuple_to_list(X)) || - X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])], - InfoKeys); + case rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]) of + L when is_list(L) -> display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- L], + InfoKeys); + Other -> Other + end; action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), -- cgit v1.2.1 From 34290816e2bdfe4205bda82c6ae95aafef319d65 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 20 Jan 2011 23:12:42 +0000 Subject: move channel lookups into handle_ methods This avoids double lookup in some paths and unnecessary invocations of run_message_queue. Also inline recorde_current_channel_tx, which eliminates some lookup. And allow rollback to trigger the forgetting of a channel record. --- src/rabbit_amqqueue_process.erl | 72 +++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 43 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 48192dcb..0346ec7d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -332,11 +332,6 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -record_current_channel_tx(ChPid, Txn) -> - %% as a side effect this also starts monitoring the channel (if - %% that wasn't happening already) - store_ch_record((ch_record(ChPid))#cr{txn = Txn}). - deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, @@ -495,7 +490,7 @@ attempt_delivery(#delivery{txn = Txn, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - record_current_channel_tx(ChPid, Txn), + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), {true, NeedsConfirming, State#q{backing_queue_state = @@ -591,7 +586,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> true -> {stop, State1}; false -> State2 = case Txn of none -> State1; - _ -> rollback_transaction(Txn, ChPid, + _ -> rollback_transaction(Txn, C, State1) end, {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -627,39 +622,24 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue( confirm_messages(Guids, State#q{backing_queue_state = BQS1})). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - %% ChPid must be known here because of the participant management - %% by the channel. However, in a cluster, the DOWN can overtake - %% the commit, and so there is a case where handle_ch_down has - %% already been called for ChPid. - case lookup_ch(ChPid) of - not_found -> - gen_server2:reply(From, ok), - State; - C = #cr{acktags = ChAckTags} -> - {AckTags, BQS1} = BQ:tx_commit( - Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), BQS), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1} - end. - -rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - State; - #cr{} -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on - %% ack+txn then we would add them back in here (would also - %% require ChPid) - record_current_channel_tx(ChPid, none), - State#q{backing_queue_state = BQS1} - end. +commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> + {AckTags, BQS1} = BQ:tx_commit( + Txn, fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(TTL), BQS), + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + State#q{backing_queue_state = BQS1}. + +rollback_transaction(Txn, C, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + %% Iff we removed acktags from the channel record on ack+txn then + %% we would add them back in here. + maybe_store_ch_record(C#cr{txn = none}), + State#q{backing_queue_state = BQS1}. subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). @@ -860,8 +840,11 @@ handle_call({deliver, Delivery}, From, State) -> noreply(NewState); handle_call({commit, Txn, ChPid}, From, State) -> - NewState = commit_transaction(Txn, From, ChPid, State), - noreply(run_message_queue(NewState)); + case lookup_ch(ChPid) of + not_found -> reply(ok, State); + C -> noreply(run_message_queue( + commit_transaction(Txn, From, C, State))) + end; handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1060,7 +1043,10 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end; handle_cast({rollback, Txn, ChPid}, State) -> - noreply(rollback_transaction(Txn, ChPid, State)); + noreply(case lookup_ch(ChPid) of + not_found -> State; + C -> rollback_transaction(Txn, C, State) + end); handle_cast(delete_immediately, State) -> {stop, normal, State}; -- cgit v1.2.1 From b370750e95f2788569d5363f24ab08899f9e63fd Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 21 Jan 2011 12:05:06 +0000 Subject: Fix docs --- docs/rabbitmqctl.1.xml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index ccc7c970..93c85617 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1193,9 +1193,7 @@ confirm - Confirm mode for the channel. Either - none, single or - multiple. + True if the channel is in confirm mode, false otherwise. unconfirmed -- cgit v1.2.1 From 77b1a8bd799b20ed3c56bb1a5103f36f099385ac Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 21 Jan 2011 12:27:57 +0000 Subject: Correct unused variable name. --- src/rabbit_channel.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eab20469..5de9c055 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -488,7 +488,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -remove_queue_unconfirmed(none, _XQ, Acc, _State) -> +remove_queue_unconfirmed(none, _QPid, Acc, _State) -> Acc; remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) -> remove_queue_unconfirmed(gb_trees:next(Next), QPid, -- cgit v1.2.1 From 0e40b583638ec14811852aa3281df677c4b7ccdb Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 21 Jan 2011 13:03:11 +0000 Subject: rename variables for consistency --- src/rabbit_channel.erl | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5de9c055..ea380a4c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -297,12 +297,12 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason}, %% TODO: this does a complete scan and partial rebuild of the %% tree, which is quite efficient. To do better we'd need to %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MEs, UC1} = remove_queue_unconfirmed( + {MXs, UC1} = remove_queue_unconfirmed( gb_trees:next(gb_trees:iterator(UC)), QPid, {[], UC}, State), erase_queue_stats(QPid), noreply( - queue_blocked(QPid, record_confirms(MEs, State#ch{unconfirmed = UC1}))). + queue_blocked(QPid, record_confirms(MXs, State#ch{unconfirmed = UC1}))). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -502,13 +502,13 @@ record_confirm(MsgSeqNo, XName, State) -> record_confirms([], State) -> State; -record_confirms(MEs, State = #ch{confirmed = C}) -> - State#ch{confirmed = [MEs | C]}. +record_confirms(MXs, State = #ch{confirmed = C}) -> + State#ch{confirmed = [MXs | C]}. confirm([], _QPid, State) -> State; confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MEs, UC1} = + {MXs, UC1} = lists:foldl( fun(MsgSeqNo, {_DMs, UC0} = Acc) -> case gb_trees:lookup(MsgSeqNo, UC0) of @@ -516,14 +516,14 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State) end end, {[], UC}, MsgSeqNos), - record_confirms(MEs, State#ch{unconfirmed = UC1}). + record_confirms(MXs, State#ch{unconfirmed = UC1}). -remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MEs, UC}, State) -> +remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) -> Qs1 = sets:del_element(QPid, Qs), maybe_incr_stats([{{QPid, XName}, 1}], confirm, State), case sets:size(Qs1) of - 0 -> {[{MsgSeqNo, XName} | MEs], gb_trees:delete(MsgSeqNo, UC)}; - _ -> {MEs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} + 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> -- cgit v1.2.1 From 1c6a0dc5564505e30c74155288e52d1df2458103 Mon Sep 17 00:00:00 2001 From: Alexandru Scvortov Date: Fri, 21 Jan 2011 13:09:02 +0000 Subject: fold cases --- src/rabbit_channel.erl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 40337843..fbf0e573 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1249,17 +1249,11 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -send_confirms(State = #ch{confirmed = C, stats_timer = StatsTimer}) -> +send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), - MsgSeqNos = case rabbit_event:stats_level(StatsTimer) of - fine -> - [ begin maybe_incr_stats([{ExchangeName, 1}], - confirm, State), - MsgSeqNo - end || {MsgSeqNo, ExchangeName} <- C1]; - _ -> - [MsgSeqNo || {MsgSeqNo, _} <- C1] - end, + MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State), + MsgSeqNo + end || {MsgSeqNo, ExchangeName} <- C1 ], send_confirms(MsgSeqNos, State #ch{confirmed = []}). send_confirms([], State) -> State; -- cgit v1.2.1 From b84acbed1020eb36250f27bd8a32c1e6fccf3965 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 21 Jan 2011 14:29:55 +0000 Subject: Rename the info item for consistency. --- docs/rabbitmqctl.1.xml | 2 +- src/rabbit_channel.erl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 93c85617..bd9fee7d 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1196,7 +1196,7 @@ True if the channel is in confirm mode, false otherwise. - unconfirmed + messages_unconfirmed Number of published messages not yet confirmed. On channels not in confirm mode, this remains 0. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fbf0e573..91559ea6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -44,7 +44,7 @@ confirm, consumer_count, messages_unacknowledged, - unconfirmed, + messages_unconfirmed, acks_uncommitted, prefetch_count, client_flow_blocked]). @@ -1295,7 +1295,7 @@ i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); -i(unconfirmed, #ch{unconfirmed = UC}) -> +i(messages_unconfirmed, #ch{unconfirmed = UC}) -> gb_trees:size(UC); i(messages_unacknowledged, #ch{unacked_message_q = UAMQ, uncommitted_ack_q = UAQ}) -> -- cgit v1.2.1 From 1d50f8c007729875efdab3fa4d02faa11f5e4812 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 23 Jan 2011 18:56:51 +0000 Subject: dependently type acktag producing functions s.t. the "blank ack" case is manifest Switched from blank_ack to the more universal 'undefined' in the process. --- include/rabbit_backing_queue_spec.hrl | 15 +++++++++------ src/rabbit_variable_queue.erl | 6 +++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 296bfdb3..accb2c0e 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -14,14 +14,13 @@ %% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. %% --type(fetch_result() :: +-type(fetch_result(Ack) :: ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). + {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(ack_required() :: boolean()). -type(confirm_required() :: boolean()). -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) @@ -36,13 +35,17 @@ -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -spec(publish/3 :: (rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). --spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), +-spec(publish_delivered/4 :: (true, rabbit_types:basic_message(), rabbit_types:message_properties(), state()) - -> {ack(), state()}). + -> {ack(), state()}; + (false, rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) + -> {undefined, state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). --spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). +-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; + (false, state()) -> {fetch_result(undefined), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f39bc964..97b33c82 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -299,7 +299,7 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id() | 'blank_ack'). +-type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -509,7 +509,7 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, #basic_message { guid = Guid }, _MsgProps, State = #vqstate { len = 0 }) -> blind_confirm(self(), gb_sets:singleton(Guid)), - {blank_ack, a(State)}; + {undefined, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, MsgProps = #message_properties { @@ -628,7 +628,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { MsgStatus #msg_status { is_delivered = true }, State), {SeqId, StateN}; - false -> {blank_ack, State} + false -> {undefined, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), -- cgit v1.2.1 From 9fd4bf7fa666ea4b7763f676bc00663a42667997 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 23 Jan 2011 19:02:21 +0000 Subject: fix typos --- src/rabbit_variable_queue.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 97b33c82..7142d560 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -281,12 +281,11 @@ -record(sync, { acks_persistent, acks_all, pubs, funs }). %% When we discover, on publish, that we should write some indices to -%% disk for some betas, the RAM_INDEX_BATCH_SIZE sets the number of -%% betas that we must be due to write indices for before we do any -%% work at all. This is both a minimum and a maximum - we don't write -%% fewer than RAM_INDEX_BATCH_SIZE indices out in one go, and we don't -%% write more - we can always come back on the next publish to do -%% more. +%% disk for some betas, the IO_BATCH_SIZE sets the number of betas +%% that we must be due to write indices for before we do any work at +%% all. This is both a minimum and a maximum - we don't write fewer +%% than IO_BATCH_SIZE indices out in one go, and we don't write more - +%% we can always come back on the next publish to do more. -define(IO_BATCH_SIZE, 64). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). @@ -897,7 +896,7 @@ cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. gb_sets_maybe_insert(false, _Val, Set) -> Set; -%% when requeueing, we re-add a guid to the unconfimred set +%% when requeueing, we re-add a guid to the unconfirmed set gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, -- cgit v1.2.1