diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-04-19 13:10:35 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-04-19 13:10:35 +0100 |
commit | 423e12d4f72380e60ec0dac06b809fd29c9f41d0 (patch) | |
tree | 16fc4968f090baced5fc8878ed5b1b15fe88e57f | |
parent | edf4bcfe0ecddb6232a3830ba328053c0d4436e1 (diff) | |
parent | 0ecafa1504539aa01798c05e35dab7ec0a354350 (diff) | |
download | rabbitmq-server-423e12d4f72380e60ec0dac06b809fd29c9f41d0.tar.gz |
Merged bug24855 into default
42 files changed, 986 insertions, 833 deletions
@@ -42,9 +42,9 @@ BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt ifndef USE_SPECS -# our type specs rely on features and bug fixes in dialyzer that are -# only available in R14B03 upwards (R14B03 is erts 5.8.4) -USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().') +# our type specs rely on callback specs, which are available in R15B +# upwards. +USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,9]), halt().') endif ifndef USE_PROPER_QC diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl deleted file mode 100644 index 61a2e22a..00000000 --- a/include/rabbit_auth_backend_spec.hrl +++ /dev/null @@ -1,31 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --ifdef(use_specs). - --spec(description/0 :: () -> [{atom(), any()}]). - --spec(check_user_login/2 :: (rabbit_types:username(), [term()]) -> - {'ok', rabbit_types:user()} | - {'refused', string(), [any()]} | - {'error', any()}). --spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> - boolean() | {'error', any()}). --spec(check_resource_access/3 :: (rabbit_types:user(), - rabbit_types:r(atom()), - rabbit_access_control:permission_atom()) -> - boolean() | {'error', any()}). --endif. diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl deleted file mode 100644 index 9a2f5e05..00000000 --- a/include/rabbit_auth_mechanism_spec.hrl +++ /dev/null @@ -1,28 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --ifdef(use_specs). - --spec(description/0 :: () -> [{atom(), any()}]). --spec(should_offer/1 :: (rabbit_net:socket()) -> boolean()). --spec(init/1 :: (rabbit_net:socket()) -> any()). --spec(handle_response/2 :: (binary(), any()) -> - {'ok', rabbit_types:user()} | - {'challenge', binary(), any()} | - {'protocol_error', string(), [any()]} | - {'refused', string(), [any()]}). - --endif. diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl deleted file mode 100644 index 79d44e1b..00000000 --- a/include/rabbit_backing_queue_spec.hrl +++ /dev/null @@ -1,71 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). --type(attempt_recovery() :: boolean()). --type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). --type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). --type(duration() :: ('undefined' | 'infinity' | number())). - --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | - 'undefined'). - --spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). --spec(stop/0 :: () -> 'ok'). --spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(), - async_callback()) -> state()). --spec(terminate/2 :: (any(), state()) -> state()). --spec(delete_and_terminate/2 :: (any(), state()) -> state()). --spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/4 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state()). --spec(publish_delivered/5 :: (true, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}). --spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). --spec(dropwhile/3 :: - (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state()). --spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; - (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). --spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()). --spec(requeue/2 :: ([ack()], state()) - -> {[rabbit_guid:guid()], state()}). --spec(len/1 :: (state()) -> non_neg_integer()). --spec(is_empty/1 :: (state()) -> boolean()). --spec(set_ram_duration_target/2 :: - (duration(), state()) -> state()). --spec(ram_duration/1 :: (state()) -> {duration(), state()}). --spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle'). --spec(timeout/1 :: (state()) -> state()). --spec(handle_pre_hibernate/1 :: (state()) -> state()). --spec(status/1 :: (state()) -> [{atom(), any()}]). --spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(is_duplicate/2 :: - (rabbit_types:basic_message(), state()) -> - {'false'|'published'|'discarded', state()}). --spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl deleted file mode 100644 index 8f7e22d3..00000000 --- a/include/rabbit_exchange_type_spec.hrl +++ /dev/null @@ -1,38 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --ifdef(use_specs). - --type(tx() :: 'transaction' | 'none'). --type(serial() :: pos_integer() | tx()). - --spec(description/0 :: () -> [{atom(), any()}]). --spec(serialise_events/0 :: () -> boolean()). --spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery()) - -> rabbit_router:match_result()). --spec(validate/1 :: (rabbit_types:exchange()) -> 'ok'). --spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok'). --spec(delete/3 :: (tx(), rabbit_types:exchange(), - [rabbit_types:binding()]) -> 'ok'). --spec(add_binding/3 :: (serial(), rabbit_types:exchange(), - rabbit_types:binding()) -> 'ok'). --spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(), - [rabbit_types:binding()]) -> 'ok'). --spec(assert_args_equivalence/2 :: - (rabbit_types:exchange(), rabbit_framing:amqp_table()) - -> 'ok' | rabbit_types:connection_exit()). - --endif. diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl deleted file mode 100644 index 75d7eb71..00000000 --- a/include/rabbit_msg_store_index.hrl +++ /dev/null @@ -1,45 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. -%% - --include("rabbit_msg_store.hrl"). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(dir() :: any()). --type(index_state() :: any()). --type(keyvalue() :: any()). --type(fieldpos() :: non_neg_integer()). --type(fieldvalue() :: any()). - --spec(new/1 :: (dir()) -> index_state()). --spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())). --spec(lookup/2 :: - (rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue())). --spec(insert/2 :: (keyvalue(), index_state()) -> 'ok'). --spec(update/2 :: (keyvalue(), index_state()) -> 'ok'). --spec(update_fields/3 :: (rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} | - [{fieldpos(), fieldvalue()}]), - index_state()) -> 'ok'). --spec(delete/2 :: (rabbit_types:msg_id(), index_state()) -> 'ok'). --spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok'). --spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). --spec(terminate/1 :: (index_state()) -> any()). - --endif. - -%%---------------------------------------------------------------------------- diff --git a/src/dtree.erl b/src/dtree.erl index 265bb340..ca2d30cf 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -16,23 +16,23 @@ %% A dual-index tree. %% -%% Conceptually, what we want is a map that has two distinct sets of -%% keys (referred to here as primary and secondary, although that -%% shouldn't imply a hierarchy) pointing to one set of -%% values. However, in practice what we'll always want to do is insert -%% a value that's pointed at by (one primary, many secondaries) and -%% remove values that are pointed at by (one secondary, many -%% primaries) or (one secondary, all primaries). Thus the API. +%% Entries have the following shape: %% -%% Entries exists while they have a non-empty secondary key set. The -%% 'take' operations return the entries that got removed, i.e. that -%% had no remaining secondary keys. take/3 expects entries to exist -%% with the supplied primary keys and secondary key. take/2 can cope -%% with the supplied secondary key having no entries. +%% +----+--------------------+---+ +%% | PK | SK1, SK2, ..., SKN | V | +%% +----+--------------------+---+ +%% +%% i.e. a primary key, set of secondary keys, and a value. +%% +%% There can be only one entry per primary key, but secondary keys may +%% appear in multiple entries. +%% +%% The set of secondary keys must be non-empty. Or, to put it another +%% way, entries only exist while their secondary key set is non-empty. -module(dtree). --export([empty/0, insert/4, take/3, take/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -52,6 +52,7 @@ -spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -63,6 +64,12 @@ empty() -> {gb_trees:empty(), gb_trees:empty()}. +%% Insert an entry. Fails if there already is an entry with the given +%% primary key. +insert(PK, [], V, {P, S}) -> + %% dummy insert to force error if PK exists + gb_trees:insert(PK, {gb_sets:empty(), V}, P), + {P, S}; insert(PK, SKs, V, {P, S}) -> {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), lists:foldl(fun (SK, S0) -> @@ -74,21 +81,45 @@ insert(PK, SKs, V, {P, S}) -> end end, S, SKs)}. +%% Remove the given secondary key from the entries of the given +%% primary keys, returning the primary-key/value pairs of any entries +%% that were dropped as the result (i.e. due to their secondary key +%% set becoming empty). It is ok for the given primary keys and/or +%% secondary key to not exist. take(PKs, SK, {P, S}) -> - {KVs, P1} = take2(PKs, SK, P), - PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)), - {KVs, {P1, case gb_sets:is_empty(PKS) of - true -> gb_trees:delete(SK, S); - false -> gb_trees:update(SK, PKS, S) - end}}. + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), + PKSInter = gb_sets:intersection(PKS, TakenPKS), + PKSDiff = gb_sets_difference (PKS, PKSInter), + {KVs, P1} = take2(PKSInter, SK, P), + {KVs, {P1, case gb_sets:is_empty(PKSDiff) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKSDiff, S) + end}} + end. +%% Remove the given secondary key from all entries, returning the +%% primary-key/value pairs of any entries that were dropped as the +%% result (i.e. due to their secondary key set becoming empty). It is +%% ok for the given secondary key to not exist. take(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; - {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P), + {value, PKS} -> {KVs, P1} = take2(PKS, SK, P), {KVs, {P1, gb_trees:delete(SK, S)}} end. +%% Drop all entries which contain the given secondary key, returning +%% the primary-key/value pairs of these entries. It is ok for the +%% given secondary key to not exist. +take_all(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), + {KVs, {P1, prune(SKS, PKS, S)}} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -100,12 +131,33 @@ size({P, _S}) -> gb_trees:size(P). %%---------------------------------------------------------------------------- -take2(PKs, SK, P) -> - lists:foldl(fun (PK, {KVs, P0}) -> - {SKS, V} = gb_trees:get(PK, P0), - SKS1 = gb_sets:delete(SK, SKS), - case gb_sets:is_empty(SKS1) of - true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)}; - false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} - end - end, {[], P}, PKs). +take2(PKS, SK, P) -> + gb_sets:fold(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> KVs1 = [{PK, V} | KVs], + {KVs1, gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKS). + +take_all2(PKS, P) -> + gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), + gb_trees:delete(PK, P0)} + end, {[], gb_sets:empty(), P}, PKS). + +prune(SKS, PKS, S) -> + gb_sets:fold(fun (SK0, S0) -> + PKS1 = gb_trees:get(SK0, S0), + PKS2 = gb_sets_difference(PKS1, PKS), + case gb_sets:is_empty(PKS2) of + true -> gb_trees:delete(SK0, S0); + false -> gb_trees:update(SK0, PKS2, S0) + end + end, S, SKS). + +gb_sets_difference(S1, S2) -> + gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/gen_server2.erl b/src/gen_server2.erl index f8537487..78bbbe06 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -31,13 +31,13 @@ %% handle_pre_hibernate/1 then the default action is to hibernate. %% %% 6) init can return a 4th arg, {backoff, InitialTimeout, -%% MinimumTimeout, DesiredHibernatePeriod} (all in -%% milliseconds). Then, on all callbacks which can return a timeout -%% (including init), timeout can be 'hibernate'. When this is the -%% case, the current timeout value will be used (initially, the -%% InitialTimeout supplied from init). After this timeout has -%% occurred, hibernation will occur as normal. Upon awaking, a new -%% current timeout value will be calculated. +%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds, +%% 'infinity' does not make sense here). Then, on all callbacks which +%% can return a timeout (including init), timeout can be +%% 'hibernate'. When this is the case, the current timeout value will +%% be used (initially, the InitialTimeout supplied from init). After +%% this timeout has occurred, hibernation will occur as normal. Upon +%% awaking, a new current timeout value will be calculated. %% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the @@ -135,9 +135,10 @@ %%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% terminate(Reason, State) Let the user module clean up +%%% Reason = normal | shutdown | {shutdown, Term} | Term %%% always called when server terminates %%% -%%% ==> ok +%%% ==> ok | Term %%% %%% handle_pre_hibernate(State) %%% @@ -182,8 +183,6 @@ multi_call/2, multi_call/3, multi_call/4, enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]). --export([behaviour_info/1]). - %% System exports -export([system_continue/3, system_terminate/4, @@ -200,12 +199,12 @@ timeout_state, queue, debug, prioritise_call, prioritise_cast, prioritise_info}). +-ifdef(use_specs). + %%%========================================================================= %%% Specs. These exist only to shut up dialyzer's warnings %%%========================================================================= --ifdef(use_specs). - -type(gs2_state() :: #gs2_state{}). -spec(handle_common_termination/3 :: @@ -214,18 +213,58 @@ -spec(pre_hibernate/1 :: (gs2_state()) -> no_return()). -spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()). --endif. +-type(millis() :: non_neg_integer()). %%%========================================================================= %%% API %%%========================================================================= +-callback init(Args :: term()) -> + {ok, State :: term()} | + {ok, State :: term(), timeout() | hibernate} | + {ok, State :: term(), timeout() | hibernate, + {backoff, millis(), millis(), millis()}} | + ignore | + {stop, Reason :: term()}. +-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: term()) -> + {reply, Reply :: term(), NewState :: term()} | + {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} | + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate} | + {stop, Reason :: term(), + Reply :: term(), NewState :: term()}. +-callback handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate} | + {stop, Reason :: term(), NewState :: term()}. +-callback handle_info(Info :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate} | + {stop, Reason :: term(), NewState :: term()}. +-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: term()) -> + ok | term(). +-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(), + Extra :: term()) -> + {ok, NewState :: term()} | {error, Reason :: term()}. + +%% It's not possible to define "optional" -callbacks, so putting specs +%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result +%% in warnings (the same applied for the behaviour_info before). + +-else. + +-export([behaviour_info/1]). + behaviour_info(callbacks) -> [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, {terminate,2},{code_change,3}]; behaviour_info(_Other) -> undefined. +-endif. + %%% ----------------------------------------------------------------- %%% Starts a generic server. %%% start(Mod, Args, Options) @@ -57,8 +57,8 @@ %% you wish to be passed into the callback module's functions. The %% joined/2 function will be called when we have joined the group, %% with the arguments passed to start_link and a list of the current -%% members of the group. See the comments in behaviour_info/1 below -%% for further details of the callback functions. +%% members of the group. See the callbacks specs and the comments +%% below for further details of the callback functions. %% %% leave/1 %% Provide the Pid. Removes the Pid from the group. The callback @@ -378,7 +378,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/2]). +-ifndef(use_specs). -export([behaviour_info/1]). +-endif. -export([table_definitions/0]). @@ -431,56 +433,63 @@ -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(group_members/1 :: (pid()) -> [pid()]). --endif. +%% The joined, members_changed and handle_msg callbacks can all +%% return any of the following terms: +%% +%% 'ok' - the callback function returns normally +%% +%% {'stop', Reason} - the callback indicates the member should +%% stop with reason Reason and should leave the group. +%% +%% {'become', Module, Args} - the callback indicates that the +%% callback module should be changed to Module and that the +%% callback functions should now be passed the arguments +%% Args. This allows the callback module to be dynamically +%% changed. + +%% Called when we've successfully joined the group. Supplied with +%% Args provided in start_link, plus current group members. +-callback joined(Args :: term(), Members :: [pid()]) -> + ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. + +%% Supplied with Args provided in start_link, the list of new +%% members and the list of members previously known to us that +%% have since died. Note that if a member joins and dies very +%% quickly, it's possible that we will never see that member +%% appear in either births or deaths. However we are guaranteed +%% that (1) we will see a member joining either in the births +%% here, or in the members passed to joined/2 before receiving +%% any messages from it; and (2) we will not see members die that +%% we have not seen born (or supplied in the members to +%% joined/2). +-callback members_changed(Args :: term(), Births :: [pid()], + Deaths :: [pid()]) -> + ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. + +%% Supplied with Args provided in start_link, the sender, and the +%% message. This does get called for messages injected by this +%% member, however, in such cases, there is no special +%% significance of this invocation: it does not indicate that the +%% message has made it to any other members, let alone all other +%% members. +-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> + ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. + +%% Called on gm member termination as per rules in gen_server, +%% with the Args provided in start_link plus the termination +%% Reason. +-callback terminate(Args :: term(), Reason :: term()) -> + ok | term(). + +-else. behaviour_info(callbacks) -> - [ - %% The joined, members_changed and handle_msg callbacks can all - %% return any of the following terms: - %% - %% 'ok' - the callback function returns normally - %% - %% {'stop', Reason} - the callback indicates the member should - %% stop with reason Reason and should leave the group. - %% - %% {'become', Module, Args} - the callback indicates that the - %% callback module should be changed to Module and that the - %% callback functions should now be passed the arguments - %% Args. This allows the callback module to be dynamically - %% changed. - - %% Called when we've successfully joined the group. Supplied with - %% Args provided in start_link, plus current group members. - {joined, 2}, - - %% Supplied with Args provided in start_link, the list of new - %% members and the list of members previously known to us that - %% have since died. Note that if a member joins and dies very - %% quickly, it's possible that we will never see that member - %% appear in either births or deaths. However we are guaranteed - %% that (1) we will see a member joining either in the births - %% here, or in the members passed to joined/2 before receiving - %% any messages from it; and (2) we will not see members die that - %% we have not seen born (or supplied in the members to - %% joined/2). - {members_changed, 3}, - - %% Supplied with Args provided in start_link, the sender, and the - %% message. This does get called for messages injected by this - %% member, however, in such cases, there is no special - %% significance of this invocation: it does not indicate that the - %% message has made it to any other members, let alone all other - %% members. - {handle_msg, 3}, - - %% Called on gm member termination as per rules in gen_server, - %% with the Args provided in start_link plus the termination - %% Reason. - {terminate, 2} - ]; + [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}]; behaviour_info(_Other) -> undefined. +-endif. + create_tables() -> create_tables([?TABLE]). diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index a599effa..221f6a87 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -120,8 +120,6 @@ delete_child/2, terminate_child/2, which_children/1, count_children/1, check_childspecs/1]). --export([behaviour_info/1]). - -behaviour(?GEN_SERVER). -behaviour(?SUPERVISOR). @@ -142,15 +140,20 @@ -ifdef(use_specs). --type child() :: pid() | 'undefined'. --type child_id() :: term(). --type modules() :: [module()] | 'dynamic'. --type worker() :: 'worker' | 'supervisor'. --type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. --type sup_ref() :: (Name :: atom()) - | {Name :: atom(), Node :: node()} - | {'global', Name :: atom()} - | pid(). +%%-------------------------------------------------------------------------- +%% Callback behaviour +%%-------------------------------------------------------------------------- + +-callback init(Args :: term()) -> + {ok, {{RestartStrategy :: supervisor2:strategy(), + MaxR :: non_neg_integer(), + MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor2:child_spec()]}} + | ignore. + +%%-------------------------------------------------------------------------- +%% Specs +%%-------------------------------------------------------------------------- -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. @@ -163,54 +166,26 @@ Args :: term(). -spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when - SupName :: sup_name(), + SupName :: supervisor2:sup_name(), GroupName :: group_name(), Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when - SupRef :: sup_ref(), - ChildSpec :: supervisor:child_spec() | (List :: [term()]). - --spec restart_child(SupRef, Id) -> Result when - SupRef :: sup_ref(), - Id :: child_id(), - Result :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one' | term(). - --spec delete_child(SupRef, Id) -> Result when - SupRef :: sup_ref(), - Id :: child_id(), - Result :: 'ok' | {'error', Error}, - Error :: 'running' | 'not_found' | 'simple_one_for_one'. - --spec terminate_child(SupRef, Id) -> Result when - SupRef :: sup_ref(), - Id :: pid() | child_id(), - Result :: 'ok' | {'error', Error}, - Error :: 'not_found' | 'simple_one_for_one'. - --spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when - SupRef :: sup_ref(), - Id :: child_id() | 'undefined', - Child :: child(), - Type :: worker(), - Modules :: modules(). - --spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [supervisor:child_spec()], - Result :: 'ok' | {'error', Error :: term()}. - -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [supervisor:child_spec()], - Result :: startlink_ret(). + ChildSpecs :: [supervisor2:child_spec()], + Result :: supervisor2:startlink_ret(). -spec create_tables() -> Result when Result :: 'ok'. +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> [{init,1}]; +behaviour_info(_Other) -> undefined. + -endif. %%---------------------------------------------------------------------------- @@ -250,9 +225,6 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2). count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2). check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). -behaviour_info(callbacks) -> [{init,1}]; -behaviour_info(_Other) -> undefined. - call(Sup, Msg) -> ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). diff --git a/src/pmon.erl b/src/pmon.erl new file mode 100644 index 00000000..45786577 --- /dev/null +++ b/src/pmon.erl @@ -0,0 +1,64 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved. +%% + +-module(pmon). + +-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2, + monitored/1, is_empty/1]). + +-ifdef(use_specs). + +%%---------------------------------------------------------------------------- + +-export_type([?MODULE/0]). + +-opaque(?MODULE() :: dict()). + +-spec(new/0 :: () -> ?MODULE()). +-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()). +-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()). +-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()). +-spec(monitored/1 :: (?MODULE()) -> [pid()]). +-spec(is_empty/1 :: (?MODULE()) -> boolean()). + +-endif. + +new() -> dict:new(). + +monitor(Pid, M) -> + case dict:is_key(Pid, M) of + true -> M; + false -> dict:store(Pid, erlang:monitor(process, Pid), M) + end. + +monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids). + +demonitor(Pid, M) -> + case dict:find(Pid, M) of + {ok, MRef} -> erlang:demonitor(MRef), + dict:erase(Pid, M); + error -> M + end. + +is_monitored(Pid, M) -> dict:is_key(Pid, M). + +erase(Pid, M) -> dict:erase(Pid, M). + +monitored(M) -> dict:fetch_keys(M). + +is_empty(M) -> dict:size(M) == 0. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ecbcbc3..c1673504 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -109,7 +109,7 @@ -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). --spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_immediately/1 :: (qpids()) -> 'ok'). -spec(delete/3 :: (rabbit_types:amqqueue(), 'false', 'false') -> qlen(); @@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], @@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) -> check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_positive_int_arg({Type, Val}, _Args) -> +check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of - false -> {error, {unacceptable_type, Type}}; - true when Val =< 0 -> {error, {value_zero_or_less, Val}}; - true -> ok + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_positive_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error + end. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> @@ -455,8 +468,9 @@ consumers_all(VHostPath) -> stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). -delete_immediately(#amqqueue{ pid = QPid }) -> - gen_server2:cast(QPid, delete_immediately). +delete_immediately(QPids) -> + [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], + ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> delegate_call(QPid, {delete, IfUnused, IfEmpty}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 75b92f1f..2063e557 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed, delayed_stop, @@ -74,9 +75,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -131,18 +132,19 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -538,17 +541,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + case attempt_delivery(Delivery, Confirm, State) of + {true, State1} -> + maybe_record_confirm_message(Confirm, State1); + %% the next two are optimisations + {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> + discard_delivery(Delivery, State1); + {false, State1 = #q{ttl = 0, dlx = undefined}} -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + discard_delivery(Delivery, State1); + {false, State1} -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -597,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -618,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -719,57 +731,51 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> State) end. +dead_letter_publish(Msg, Reason, + State = #q{publish_seqno = MsgSeqNo, + dlx = DLX}) -> + Delivery = #delivery{message = #basic_message{exchange_name = XName}} = + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo), + {ok, X} = rabbit_exchange:lookup(XName), + Queues = rabbit_exchange:route(X, Delivery), + {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues1), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids. + dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> - {ok, _, QPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + unconfirmed = UC}) -> + QPids = dead_letter_publish(Msg, Reason, State), + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> cleanup_after_confirm([AckTag], State2); + [] -> cleanup_after_confirm([AckTag], State1); _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State2#q{unconfirmed = UC1}) - end. - -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - case (MsgSeqNoAckTags =/= [] andalso - rabbit_misc:is_abnormal_termination(Reason)) of - true -> rabbit_log:warning("Dead queue lost ~p messages~n", - [length(MsgSeqNoAckTags)]); - false -> ok - end, - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = dict:erase(QPid, QMons), - unconfirmed = UC1}) + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + QNameS = rabbit_misc:rs(qname(State)), + rabbit_log:warning("DLQ ~p for ~s died with " + "~p unconfirmed messages~n", + [QPid, QNameS, length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. stop_later(Reason, State) -> @@ -801,28 +807,31 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -already_been_here(_Delivery, #q{dlx = undefined}) -> - false; -already_been_here(#delivery{message = #basic_message{content = Content}}, - State) -> +detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, + Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), - #resource{name = QueueName} = qname(State), + NoCycles = {Queues, []}, case Headers of undefined -> - false; + NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of {array, DeathTables} -> OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || {table, D} <- DeathTables], OldQueues1 = [QName || {longstr, QName} <- OldQueues], - case lists:member(QueueName, OldQueues1) of - true -> [QueueName | OldQueues1]; - _ -> false - end; + OldQueuesSet = ordsets:from_list(OldQueues1), + {Cycling, NotCycling} = + lists:partition( + fun(Queue) -> + ordsets:is_element(Queue#resource.name, + OldQueuesSet) + end, Queues), + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; _ -> - false + NoCycles end end. @@ -1187,7 +1196,8 @@ handle_call(force_event_refresh, _From, handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of - false -> demonitor_queue(QPid, State); + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], @@ -1199,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, - State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) - end; + Senders1 = case Flow of + flow -> credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> Senders + end, + State1 = State#q{senders = Senders1}, + noreply(deliver_or_enqueue(Delivery, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index e0e252b8..e89951e7 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -16,42 +16,57 @@ -module(rabbit_auth_backend). +-ifdef(use_specs). + +%% A description proplist as with auth mechanisms, +%% exchanges. Currently unused. +-callback description() -> [proplist:property()]. + +%% Check a user can log in, given a username and a proplist of +%% authentication information (e.g. [{password, Password}]). +%% +%% Possible responses: +%% {ok, User} +%% Authentication succeeded, and here's the user record. +%% {error, Error} +%% Something went wrong. Log and die. +%% {refused, Msg, Args} +%% Client failed authentication. Log and die. +-callback check_user_login(rabbit_types:username(), [term()]) -> + {'ok', rabbit_types:user()} | + {'refused', string(), [any()]} | + {'error', any()}. + +%% Given #user and vhost, can a user log in to a vhost? +%% Possible responses: +%% true +%% false +%% {error, Error} +%% Something went wrong. Log and die. +-callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) -> + boolean() | {'error', any()}. + + +%% Given #user, resource and permission, can a user access a resource? +%% +%% Possible responses: +%% true +%% false +%% {error, Error} +%% Something went wrong. Log and die. +-callback check_resource_access(rabbit_types:user(), + rabbit_types:r(atom()), + rabbit_access_control:permission_atom()) -> + boolean() | {'error', any()}. + +-else. + -export([behaviour_info/1]). behaviour_info(callbacks) -> - [ - %% A description proplist as with auth mechanisms, - %% exchanges. Currently unused. - {description, 0}, - - %% Check a user can log in, given a username and a proplist of - %% authentication information (e.g. [{password, Password}]). - %% - %% Possible responses: - %% {ok, User} - %% Authentication succeeded, and here's the user record. - %% {error, Error} - %% Something went wrong. Log and die. - %% {refused, Msg, Args} - %% Client failed authentication. Log and die. - {check_user_login, 2}, - - %% Given #user and vhost, can a user log in to a vhost? - %% Possible responses: - %% true - %% false - %% {error, Error} - %% Something went wrong. Log and die. - {check_vhost_access, 2}, - - %% Given #user, resource and permission, can a user access a resource? - %% - %% Possible responses: - %% true - %% false - %% {error, Error} - %% Something went wrong. Log and die. - {check_resource_access, 3} - ]; + [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2}, + {check_resource_access, 3}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 3ef81d32..7b9df81e 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -32,8 +32,6 @@ vhost_perms_info_keys/0, user_perms_info_keys/0, user_vhost_perms_info_keys/0]). --include("rabbit_auth_backend_spec.hrl"). - -ifdef(use_specs). -type(regexp() :: binary()). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index 0c8251b8..eda6a743 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -16,31 +16,41 @@ -module(rabbit_auth_mechanism). +-ifdef(use_specs). + +%% A description. +-callback description() -> [proplist:property()]. + +%% If this mechanism is enabled, should it be offered for a given socket? +%% (primarily so EXTERNAL can be SSL-only) +-callback should_offer(rabbit_net:socket()) -> boolean(). + +%% Called before authentication starts. Should create a state +%% object to be passed through all the stages of authentication. +-callback init(rabbit_net:socket()) -> any(). + +%% Handle a stage of authentication. Possible responses: +%% {ok, User} +%% Authentication succeeded, and here's the user record. +%% {challenge, Challenge, NextState} +%% Another round is needed. Here's the state I want next time. +%% {protocol_error, Msg, Args} +%% Client got the protocol wrong. Log and die. +%% {refused, Msg, Args} +%% Client failed authentication. Log and die. +-callback handle_response(binary(), any()) -> + {'ok', rabbit_types:user()} | + {'challenge', binary(), any()} | + {'protocol_error', string(), [any()]} | + {'refused', string(), [any()]}. + +-else. + -export([behaviour_info/1]). behaviour_info(callbacks) -> - [ - %% A description. - {description, 0}, - - %% If this mechanism is enabled, should it be offered for a given socket? - %% (primarily so EXTERNAL can be SSL-only) - {should_offer, 1}, - - %% Called before authentication starts. Should create a state - %% object to be passed through all the stages of authentication. - {init, 1}, - - %% Handle a stage of authentication. Possible responses: - %% {ok, User} - %% Authentication succeeded, and here's the user record. - %% {challenge, Challenge, NextState} - %% Another round is needed. Here's the state I want next time. - %% {protocol_error, Msg, Args} - %% Client got the protocol wrong. Log and die. - %% {refused, Msg, Args} - %% Client failed authentication. Log and die. - {handle_response, 2} - ]; + [{description, 0}, {should_offer, 1}, {init, 1}, {handle_response, 2}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index 3de6e7a6..c0d86cd1 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -21,8 +21,6 @@ -export([description/0, should_offer/1, init/1, handle_response/2]). --include("rabbit_auth_mechanism_spec.hrl"). - -rabbit_boot_step({?MODULE, [{description, "auth mechanism amqplain"}, {mfa, {rabbit_registry, register, diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl index 64b01d8e..5df1d5d7 100644 --- a/src/rabbit_auth_mechanism_cr_demo.erl +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -21,8 +21,6 @@ -export([description/0, should_offer/1, init/1, handle_response/2]). --include("rabbit_auth_mechanism_spec.hrl"). - -rabbit_boot_step({?MODULE, [{description, "auth mechanism cr-demo"}, {mfa, {rabbit_registry, register, diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl index 19fb5875..423170e1 100644 --- a/src/rabbit_auth_mechanism_plain.erl +++ b/src/rabbit_auth_mechanism_plain.erl @@ -21,8 +21,6 @@ -export([description/0, should_offer/1, init/1, handle_response/2]). --include("rabbit_auth_mechanism_spec.hrl"). - -rabbit_boot_step({?MODULE, [{description, "auth mechanism plain"}, {mfa, {rabbit_registry, register, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 42627aae..6cc1c3fd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -16,164 +16,200 @@ -module(rabbit_backing_queue). +-ifdef(use_specs). + +%% We can't specify a per-queue ack/state with callback signatures +-type(ack() :: any()). +-type(state() :: any()). + +-type(fetch_result(Ack) :: + ('empty' | + %% Message, IsDelivered, AckTag, Remaining_Len + {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). +-type(is_durable() :: boolean()). +-type(attempt_recovery() :: boolean()). +-type(purged_msg_count() :: non_neg_integer()). +-type(confirm_required() :: boolean()). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(duration() :: ('undefined' | 'infinity' | number())). + +-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | + 'undefined'). + +%% Called on startup with a list of durable queue names. The queues +%% aren't being started at this point, but this call allows the +%% backing queue to perform any checking necessary for the consistency +%% of those queues, or initialise any other shared resources. +-callback start([rabbit_amqqueue:name()]) -> 'ok'. + +%% Called to tear down any state/resources. NB: Implementations should +%% not depend on this function being called on shutdown and instead +%% should hook into the rabbit supervision hierarchy. +-callback stop() -> 'ok'. + +%% Initialise the backing queue and its state. +%% +%% Takes +%% 1. the amqqueue record +%% 2. a boolean indicating whether the queue is an existing queue that +%% should be recovered +%% 3. an asynchronous callback which accepts a function of type +%% backing-queue-state to backing-queue-state. This callback +%% function can be safely invoked from any process, which makes it +%% useful for passing messages back into the backing queue, +%% especially as the backing queue does not have control of its own +%% mailbox. +-callback init(rabbit_types:amqqueue(), attempt_recovery(), + async_callback()) -> state(). + +%% Called on queue shutdown when queue isn't being deleted. +-callback terminate(any(), state()) -> state(). + +%% Called when the queue is terminating and needs to delete all its +%% content. +-callback delete_and_terminate(any(), state()) -> state(). + +%% Remove all messages in the queue, but not messages which have been +%% fetched and are pending acks. +-callback purge(state()) -> {purged_msg_count(), state()}. + +%% Publish a message. +-callback publish(rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state(). + +%% Called for messages which have already been passed straight +%% out to a client. The queue will be empty for these calls +%% (i.e. saves the round trip through the backing queue). +-callback publish_delivered(true, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) + -> {ack(), state()}; + (false, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) + -> {undefined, state()}. + +%% Return ids of messages which have been confirmed since the last +%% invocation of this function (or initialisation). +%% +%% Message ids should only appear in the result of drain_confirmed +%% under the following circumstances: +%% +%% 1. The message appears in a call to publish_delivered/4 and the +%% first argument (ack_required) is false; or +%% 2. The message is fetched from the queue with fetch/2 and the first +%% argument (ack_required) is false; or +%% 3. The message is acked (ack/2 is called for the message); or +%% 4. The message is fully fsync'd to disk in such a way that the +%% recovery of the message is guaranteed in the event of a crash of +%% this rabbit node (excluding hardware failure). +%% +%% In addition to the above conditions, a message id may only appear +%% in the result of drain_confirmed if +%% #message_properties.needs_confirming = true when the msg was +%% published (through whichever means) to the backing queue. +%% +%% It is legal for the same message id to appear in the results of +%% multiple calls to drain_confirmed, which means that the backing +%% queue is not required to keep track of which messages it has +%% already confirmed. The confirm will be issued to the publisher the +%% first time the message id appears in the result of +%% drain_confirmed. All subsequent appearances of that message id will +%% be ignored. +-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. + +%% Drop messages from the head of the queue while the supplied +%% predicate returns true. A callback function is supplied allowing +%% callers access to messages that are about to be dropped. +-callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), + state()) + -> state(). + +%% Produce the next message. +-callback fetch(true, state()) -> {fetch_result(ack()), state()}; + (false, state()) -> {fetch_result(undefined), state()}. + +%% Acktags supplied are for messages which can now be forgotten +%% about. Must return 1 msg_id per Ack, in the same order as Acks. +-callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. + +%% Acktags supplied are for messages which should be processed. The +%% provided callback function is called with each message. +-callback fold(msg_fun(), state(), [ack()]) -> state(). + +%% Reinsert messages into the queue which have already been delivered +%% and were pending acknowledgement. +-callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}. + +%% How long is my queue? +-callback len(state()) -> non_neg_integer(). + +%% Is my queue empty? +-callback is_empty(state()) -> boolean(). + +%% For the next three functions, the assumption is that you're +%% monitoring something like the ingress and egress rates of the +%% queue. The RAM duration is thus the length of time represented by +%% the messages held in RAM given the current rates. If you want to +%% ignore all of this stuff, then do so, and return 0 in +%% ram_duration/1. + +%% The target is to have no more messages in RAM than indicated by the +%% duration and the current queue rates. +-callback set_ram_duration_target(duration(), state()) -> state(). + +%% Optionally recalculate the duration internally (likely to be just +%% update your internal rates), and report how many seconds the +%% messages in RAM represent given the current rates of the queue. +-callback ram_duration(state()) -> {duration(), state()}. + +%% Should 'timeout' be called as soon as the queue process can manage +%% (either on an empty mailbox, or when a timer fires)? +-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'. + +%% Called (eventually) after needs_timeout returns 'idle' or 'timed'. +%% Note this may be called more than once for each 'idle' or 'timed' +%% returned from needs_timeout +-callback timeout(state()) -> state(). + +%% Called immediately before the queue hibernates. +-callback handle_pre_hibernate(state()) -> state(). + +%% Exists for debugging purposes, to be able to expose state via +%% rabbitmqctl list_queues backing_queue_status +-callback status(state()) -> [{atom(), any()}]. + +%% Passed a function to be invoked with the relevant backing queue's +%% state. Useful for when the backing queue or other components need +%% to pass functions into the backing queue. +-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). + +%% Called prior to a publish or publish_delivered call. Allows the BQ +%% to signal that it's already seen this message (and in what capacity +%% - i.e. was it published previously or discarded previously) and +%% thus the message should be dropped. +-callback is_duplicate(rabbit_types:basic_message(), state()) + -> {'false'|'published'|'discarded', state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ for some +%% reason. Note that this is may be invoked for messages for which +%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', +%% BQS}. +-callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). + +-else. + -export([behaviour_info/1]). behaviour_info(callbacks) -> - [ - %% Called on startup with a list of durable queue names. The - %% queues aren't being started at this point, but this call - %% allows the backing queue to perform any checking necessary for - %% the consistency of those queues, or initialise any other - %% shared resources. - {start, 1}, - - %% Called to tear down any state/resources. NB: Implementations - %% should not depend on this function being called on shutdown - %% and instead should hook into the rabbit supervision hierarchy. - {stop, 0}, - - %% Initialise the backing queue and its state. - %% - %% Takes - %% 1. the amqqueue record - %% 2. a boolean indicating whether the queue is an existing queue - %% that should be recovered - %% 3. an asynchronous callback which accepts a function of type - %% backing-queue-state to backing-queue-state. This callback - %% function can be safely invoked from any process, which - %% makes it useful for passing messages back into the backing - %% queue, especially as the backing queue does not have - %% control of its own mailbox. - {init, 3}, - - %% Called on queue shutdown when queue isn't being deleted. - {terminate, 2}, - - %% Called when the queue is terminating and needs to delete all - %% its content. - {delete_and_terminate, 2}, - - %% Remove all messages in the queue, but not messages which have - %% been fetched and are pending acks. - {purge, 1}, - - %% Publish a message. - {publish, 4}, - - %% Called for messages which have already been passed straight - %% out to a client. The queue will be empty for these calls - %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 5}, - - %% Return ids of messages which have been confirmed since - %% the last invocation of this function (or initialisation). - %% - %% Message ids should only appear in the result of - %% drain_confirmed under the following circumstances: - %% - %% 1. The message appears in a call to publish_delivered/4 and - %% the first argument (ack_required) is false; or - %% 2. The message is fetched from the queue with fetch/2 and the - %% first argument (ack_required) is false; or - %% 3. The message is acked (ack/2 is called for the message); or - %% 4. The message is fully fsync'd to disk in such a way that the - %% recovery of the message is guaranteed in the event of a - %% crash of this rabbit node (excluding hardware failure). - %% - %% In addition to the above conditions, a message id may only - %% appear in the result of drain_confirmed if - %% #message_properties.needs_confirming = true when the msg was - %% published (through whichever means) to the backing queue. - %% - %% It is legal for the same message id to appear in the results - %% of multiple calls to drain_confirmed, which means that the - %% backing queue is not required to keep track of which messages - %% it has already confirmed. The confirm will be issued to the - %% publisher the first time the message id appears in the result - %% of drain_confirmed. All subsequent appearances of that message - %% id will be ignored. - {drain_confirmed, 1}, - - %% Drop messages from the head of the queue while the supplied - %% predicate returns true. A callback function is supplied - %% allowing callers access to messages that are about to be - %% dropped. - {dropwhile, 3}, - - %% Produce the next message. - {fetch, 2}, - - %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as - %% Acks. - {ack, 2}, - - %% Acktags supplied are for messages which should be - %% processed. The provided callback function is called with each - %% message. - {fold, 3}, - - %% Reinsert messages into the queue which have already been - %% delivered and were pending acknowledgement. - {requeue, 2}, - - %% How long is my queue? - {len, 1}, - - %% Is my queue empty? - {is_empty, 1}, - - %% For the next three functions, the assumption is that you're - %% monitoring something like the ingress and egress rates of the - %% queue. The RAM duration is thus the length of time represented - %% by the messages held in RAM given the current rates. If you - %% want to ignore all of this stuff, then do so, and return 0 in - %% ram_duration/1. - - %% The target is to have no more messages in RAM than indicated - %% by the duration and the current queue rates. - {set_ram_duration_target, 2}, - - %% Optionally recalculate the duration internally (likely to be - %% just update your internal rates), and report how many seconds - %% the messages in RAM represent given the current rates of the - %% queue. - {ram_duration, 1}, - - %% Should 'timeout' be called as soon as the queue process - %% can manage (either on an empty mailbox, or when a timer - %% fires)? - {needs_timeout, 1}, - - %% Called (eventually) after needs_timeout returns 'idle' or - %% 'timed'. Note this may be called more than once for each - %% 'idle' or 'timed' returned from needs_timeout. - {timeout, 1}, - - %% Called immediately before the queue hibernates. - {handle_pre_hibernate, 1}, - - %% Exists for debugging purposes, to be able to expose state via - %% rabbitmqctl list_queues backing_queue_status - {status, 1}, - - %% Passed a function to be invoked with the relevant backing - %% queue's state. Useful for when the backing queue or other - %% components need to pass functions into the backing queue. - {invoke, 3}, - - %% Called prior to a publish or publish_delivered call. Allows - %% the BQ to signal that it's already seen this message (and in - %% what capacity - i.e. was it published previously or discarded - %% previously) and thus the message should be dropped. - {is_duplicate, 2}, - - %% Called to inform the BQ about messages which have reached the - %% queue, but are not going to be further passed to BQ for some - %% reason. Note that this is may be invoked for messages for - %% which BQ:is_duplicate/2 has already returned {'published' | - %% 'discarded', BQS}. - {discard, 3} - ]; + [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, + {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, + {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, + {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, + {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, + {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, + {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 7b00fa5f..286b69e4 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index cc876cb4..8ad59016 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -61,6 +61,8 @@ -spec(append_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). +-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). + -spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) -> rabbit_types:content()). @@ -186,6 +188,11 @@ append_table_header(Name, Info, Headers) -> end, rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + map_headers(F, Content) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content), #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4a0e93be..846890a1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = sets:new(), + queue_monitors = pmon:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State3 = handle_consuming_queue_down(QPid, State2), credit_flow:peer_down(QPid), erase_queue_stats(QPid), - noreply(State3#ch{queue_monitors = - sets:del_element(QPid, State3#ch.queue_monitors)}); + noreply(State3#ch{queue_monitors = pmon:erase( + QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -758,9 +758,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, fun () -> {error, not_found} end, fun () -> rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, - ok_msg(NoWait, #'basic.cancel_ok'{ - consumer_tag = ConsumerTag})) + Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg)) end) of ok -> {noreply, NewState}; @@ -937,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of - {new, Q = #amqqueue{}} -> + {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as @@ -945,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( - CollectorPid, Q) + CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> @@ -1091,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) -> consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, + queue_monitors = QMons, queue_consumers = QCons, capabilities = Capabilities}) -> case rabbit_misc:table_lookup( @@ -1103,24 +1102,19 @@ consumer_monitor(ConsumerTag, end, gb_sets:singleton(ConsumerTag), QCons), - monitor_queue(QPid, State#ch{queue_consumers = QCons1}); + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}; _ -> State end. -monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case sets:is_element(QPid, QMons) of - false -> erlang:monitor(process, QPid), - State#ch{queue_monitors = sets:add_element(QPid, QMons)}; - true -> State - end. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(QPid, UC), - (case rabbit_misc:is_abnormal_termination(Reason) of - true -> fun send_nacks/2; - false -> fun record_confirms/2 - end)(MXs, State#ch{unconfirmed = UC1}). + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, @@ -1323,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QNames}, State) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State1 = State#ch{queue_monitors = + pmon:monitor_all(DeliveredQPids, + State#ch.queue_monitors)}, State2 = process_routing_result(RoutingRes, DeliveredQPids, XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 83e28c44..910a89b4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +%% Optimisation +route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}}, + #delivery{message = #basic_message{routing_keys = RKs}}) -> + [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + route(X = #exchange{name = XName}, Delivery) -> route1(Delivery, {queue:from_list([X]), XName, []}). diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 44a08e24..1027570c 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -16,39 +16,57 @@ -module(rabbit_exchange_type). --export([behaviour_info/1]). +-ifdef(use_specs). -behaviour_info(callbacks) -> - [ - {description, 0}, +-type(tx() :: 'transaction' | 'none'). +-type(serial() :: pos_integer() | tx()). + +-callback description() -> [proplist:property()]. + +%% Should Rabbit ensure that all binding events that are +%% delivered to an individual exchange can be serialised? (they +%% might still be delivered out of order, but there'll be a +%% serial number). +-callback serialise_events() -> boolean(). - %% Should Rabbit ensure that all binding events that are - %% delivered to an individual exchange can be serialised? (they - %% might still be delivered out of order, but there'll be a - %% serial number). - {serialise_events, 0}, +%% The no_return is there so that we can have an "invalid" exchange +%% type (see rabbit_exchange_type_invalid). +-callback route(rabbit_types:exchange(), rabbit_types:delivery()) -> + rabbit_router:match_result(). - {route, 2}, +%% called BEFORE declaration, to check args etc; may exit with #amqp_error{} +-callback validate(rabbit_types:exchange()) -> 'ok'. - %% called BEFORE declaration, to check args etc; may exit with #amqp_error{} - {validate, 1}, +%% called after declaration and recovery +-callback create(tx(), rabbit_types:exchange()) -> 'ok'. - %% called after declaration and recovery - {create, 2}, +%% called after exchange (auto)deletion. +-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) -> + 'ok'. - %% called after exchange (auto)deletion. - {delete, 3}, +%% called after a binding has been added or recovered +-callback add_binding(serial(), rabbit_types:exchange(), + rabbit_types:binding()) -> 'ok'. - %% called after a binding has been added or recovered - {add_binding, 3}, +%% called after bindings have been deleted. +-callback remove_bindings(serial(), rabbit_types:exchange(), + [rabbit_types:binding()]) -> 'ok'. - %% called after bindings have been deleted. - {remove_bindings, 3}, +%% called when comparing exchanges for equivalence - should return ok or +%% exit with #amqp_error{} +-callback assert_args_equivalence (rabbit_types:exchange(), + rabbit_framing:amqp_table()) -> + 'ok' | rabbit_types:connection_exit(). - %% called when comparing exchanges for equivalence - should return ok or - %% exit with #amqp_error{} - {assert_args_equivalence, 2} +-else. - ]; +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1}, + {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3}, + {assert_args_equivalence, 2}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 4bce42d4..cdec1cb9 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -22,7 +22,6 @@ -export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index cc3fb87c..a64f2c29 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -22,7 +22,6 @@ -export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index de9979b4..61917d8f 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -23,7 +23,6 @@ -export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index 8f60f7d8..82d27960 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -22,7 +22,6 @@ -export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --include("rabbit_exchange_type_spec.hrl"). description() -> [{name, <<"invalid">>}, diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 84f4f8a9..3160fdf4 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -23,7 +23,6 @@ -export([description/0, serialise_events/0, route/2]). -export([validate/1, create/2, delete/3, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). --include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index d0b5bab7..2d155d14 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> ensure_gm_heartbeat(), {ok, #state { q = Q, gm = GM1, - monitors = dict:new(), + monitors = pmon:new(), death_fun = DeathFun, length_fun = LengthFun }, hibernate, @@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> ok = LengthFun(), noreply(State); -handle_cast({ensure_monitoring, Pids}, - State = #state { monitors = Monitors }) -> - Monitors1 = - lists:foldl(fun (Pid, MonitorsN) -> - case dict:is_key(Pid, MonitorsN) of - true -> MonitorsN; - false -> MRef = erlang:monitor(process, Pid), - dict:store(Pid, MRef, MonitorsN) - end - end, Monitors, Pids), - noreply(State #state { monitors = Monitors1 }). +handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> + noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> gm:broadcast(GM, heartbeat), @@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> noreply(State); handle_info({'DOWN', _MonitorRef, process, Pid, _Reason}, - State = #state { monitors = Monitors, + State = #state { monitors = Mons, death_fun = DeathFun }) -> - noreply(case dict:is_key(Pid, Monitors) of + noreply(case pmon:is_monitored(Pid, Mons) of false -> State; true -> ok = DeathFun(Pid), - State #state { monitors = dict:erase(Pid, Monitors) } + State #state { monitors = pmon:erase(Pid, Mons) } end); handle_info(Msg, State) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index daeb7d85..1cb58569 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -59,10 +59,6 @@ known_senders :: set() }). --type(ack() :: non_neg_integer()). --type(state() :: master_state()). --include("rabbit_backing_queue_spec.hrl"). - -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). @@ -177,11 +173,12 @@ dropwhile(Pred, MsgFun, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> - Len = BQ:len(BQS), + Len = BQ:len(BQS), BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), - Dropped = Len - BQ:len(BQS1), + Len1 = BQ:len(BQS1), + ok = gm:broadcast(GM, {set_length, Len1}), + Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. @@ -241,11 +238,11 @@ ack(AckTags, State = #state { gm = GM, backing_queue_state = BQS, ack_msg_id = AM }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index eb1da1e8..a7a1273d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) -> ack_num = 0, msg_id_status = dict:new(), - known_senders = dict:new(), + known_senders = pmon:new(), synchronised = false }, @@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], dict:new()), + Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - - MonitoringPids = [begin put({ch_publisher, Pid}, MRef), - Pid - end || {Pid, MRef} <- dict:to_list(KS)], - ok = rabbit_mirror_queue_coordinator:ensure_monitoring( - CPid, MonitoringPids), + MPids = pmon:monitored(KS), + ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MonitoringPids), + CPid, BQ, BQS, GM, SS, MPids), MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); @@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, {Delivery, true} <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, MTC), + AckTags, Deliveries, KS, MTC), {become, rabbit_amqqueue_process, QueueState, hibernate}. noreply(State) -> @@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> State #state { rate_timer_ref = undefined }. ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> - case dict:is_key(ChPid, KS) of - true -> State; - false -> MRef = erlang:monitor(process, ChPid), - State #state { known_senders = dict:store(ChPid, MRef, KS) } - end. + State #state { known_senders = pmon:monitor(ChPid, KS) }. local_sender_death(ChPid, State = #state { known_senders = KS }) -> - ok = case dict:is_key(ChPid, KS) of + ok = case pmon:is_monitored(ChPid, KS) of false -> ok; true -> credit_flow:peer_down(ChPid), confirm_sender_death(ChPid) @@ -628,7 +620,7 @@ confirm_sender_death(Pid) -> fun (?MODULE, State = #state { known_senders = KS, gm = GM }) -> %% We're running still as a slave - ok = case dict:is_key(Pid, KS) of + ok = case pmon:is_monitored(Pid, KS) of false -> ok; true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}), confirm_sender_death(Pid) @@ -866,21 +858,18 @@ process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, known_senders = KS }) -> - {ok, case dict:find(ChPid, KS) of - error -> - State; - {ok, MRef} -> - true = erlang:demonitor(MRef), - MS1 = case dict:find(ChPid, SQ) of - error -> - MS; - {ok, {_MQ, PendingCh}} -> - lists:foldl(fun dict:erase/2, MS, - sets:to_list(PendingCh)) - end, - State #state { sender_queues = dict:erase(ChPid, SQ), - msg_id_status = MS1, - known_senders = dict:erase(ChPid, KS) } + {ok, case pmon:is_monitored(ChPid, KS) of + false -> State; + true -> MS1 = case dict:find(ChPid, SQ) of + error -> + MS; + {ok, {_MQ, PendingCh}} -> + lists:foldl(fun dict:erase/2, MS, + sets:to_list(PendingCh)) + end, + State #state { sender_queues = dict:erase(ChPid, SQ), + msg_id_status = MS1, + known_senders = pmon:demonitor(ChPid, KS) } end}; process_instruction({length, Length}, State = #state { backing_queue = BQ, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c1be7613..0aacd654 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,7 @@ -export([multi_call/2]). -export([quit/1]). -export([os_cmd/1]). +-export([gb_sets_difference/2]). %%---------------------------------------------------------------------------- @@ -204,6 +205,7 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(quit/1 :: (integer() | string()) -> no_return()). -spec(os_cmd/1 :: (string()) -> string()). +-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). -endif. @@ -912,3 +914,6 @@ os_cmd(Command) -> false -> throw({command_not_found, Exec}); _ -> os:cmd(Command) end. + +gb_sets_difference(S1, S2) -> + gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 56265136..d69dad1f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1240,7 +1240,8 @@ client_confirm(CRef, MsgIds, ActionTaken, State) -> case dict:find(CRef, CTM) of {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds), ActionTaken), - MsgIds1 = gb_sets:difference(Gs, MsgIds), + MsgIds1 = rabbit_misc:gb_sets_difference( + Gs, MsgIds), case gb_sets:is_empty(MsgIds1) of true -> dict:erase(CRef, CTM); false -> dict:store(CRef, MsgIds1, CTM) diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 9c31439f..3defeaaf 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -16,6 +16,8 @@ -module(rabbit_msg_store_ets_index). +-include("rabbit_msg_store.hrl"). + -behaviour(rabbit_msg_store_index). -export([new/1, recover/1, @@ -25,8 +27,6 @@ -define(MSG_LOC_NAME, rabbit_msg_store_ets_index). -define(FILENAME, "msg_store_index.ets"). --include("rabbit_msg_store_index.hrl"). - -record(state, { table, dir }). new(Dir) -> diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl index 2f36256c..6cc0b2a7 100644 --- a/src/rabbit_msg_store_index.erl +++ b/src/rabbit_msg_store_index.erl @@ -16,6 +16,31 @@ -module(rabbit_msg_store_index). +-include("rabbit_msg_store.hrl"). + +-ifdef(use_specs). + +-type(dir() :: any()). +-type(index_state() :: any()). +-type(keyvalue() :: any()). +-type(fieldpos() :: non_neg_integer()). +-type(fieldvalue() :: any()). + +-callback new(dir()) -> index_state(). +-callback recover(dir()) -> rabbit_types:ok_or_error2(index_state(), any()). +-callback lookup(rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue()). +-callback insert(keyvalue(), index_state()) -> 'ok'. +-callback update(keyvalue(), index_state()) -> 'ok'. +-callback update_fields(rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} | + [{fieldpos(), fieldvalue()}]), + index_state()) -> 'ok'. +-callback delete(rabbit_types:msg_id(), index_state()) -> 'ok'. +-callback delete_object(keyvalue(), index_state()) -> 'ok'. +-callback delete_by_file(fieldvalue(), index_state()) -> 'ok'. +-callback terminate(index_state()) -> any(). + +-else. + -export([behaviour_info/1]). behaviour_info(callbacks) -> @@ -30,3 +55,5 @@ behaviour_info(callbacks) -> {terminate, 1}]; behaviour_info(_Other) -> undefined. + +-endif. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 825d1bb1..f0c75d23 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -446,16 +446,19 @@ ipv6_status(TestPort) -> {ok, LSock4} -> gen_tcp:close(LSock4), single_stack; %% IPv6-only machine. Welcome to the future. - {error, eafnosupport} -> ipv6_only; + {error, eafnosupport} -> ipv6_only; %% Linux + {error, eprotonosupport}-> ipv6_only; %% FreeBSD %% Dual stack machine with something already %% on IPv4. {error, _} -> ipv6_status(TestPort + 1) end end; - {error, eafnosupport} -> - %% IPv4-only machine. Welcome to the 90s. + %% IPv4-only machine. Welcome to the 90s. + {error, eafnosupport} -> %% Linux ipv4_only; + {error, eprotonosupport} -> %% FreeBSD + ipv4_only; + %% Port in use {error, _} -> - %% Port in use ipv6_status(TestPort + 1) end. diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl index df957d88..6dad01cc 100644 --- a/src/rabbit_queue_collector.erl +++ b/src/rabbit_queue_collector.erl @@ -23,7 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {queues, delete_from}). +-record(state, {monitors, delete_from}). -include("rabbit.hrl"). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(register/2 :: (pid(), pid()) -> 'ok'). -spec(delete_all/1 :: (pid()) -> 'ok'). -endif. @@ -51,39 +51,37 @@ delete_all(CollectorPid) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state{queues = dict:new(), delete_from = undefined}}. + {ok, #state{monitors = pmon:new(), delete_from = undefined}}. %%-------------------------------------------------------------------------- -handle_call({register, Q}, _From, - State = #state{queues = Queues, delete_from = Deleting}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), +handle_call({register, QPid}, _From, + State = #state{monitors = QMons, delete_from = Deleting}) -> case Deleting of undefined -> ok; - _ -> rabbit_amqqueue:delete_immediately(Q) + _ -> ok = rabbit_amqqueue:delete_immediately([QPid]) end, - {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}}; -handle_call(delete_all, From, State = #state{queues = Queues, +handle_call(delete_all, From, State = #state{monitors = QMons, delete_from = undefined}) -> - case dict:size(Queues) of - 0 -> {reply, ok, State#state{delete_from = From}}; - _ -> [rabbit_amqqueue:delete_immediately(Q) - || {_MRef, Q} <- dict:to_list(Queues)], - {noreply, State#state{delete_from = From}} + case pmon:monitored(QMons) of + [] -> {reply, ok, State#state{delete_from = From}}; + QPids -> ok = rabbit_amqqueue:delete_immediately(QPids), + {noreply, State#state{delete_from = From}} end. handle_cast(Msg, State) -> {stop, {unhandled_cast, Msg}, State}. -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{queues = Queues, delete_from = Deleting}) -> - Queues1 = dict:erase(MonitorRef, Queues), - case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of +handle_info({'DOWN', _MRef, process, DownPid, _Reason}, + State = #state{monitors = QMons, delete_from = Deleting}) -> + QMons1 = pmon:erase(DownPid, QMons), + case Deleting =/= undefined andalso pmon:is_empty(QMons1) of true -> gen_server:reply(Deleting, ok); false -> ok end, - {noreply, State#state{queues = Queues1}}. + {noreply, State#state{monitors = QMons1}}. terminate(_Reason, _State) -> ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e356d7ff..c74b8d5f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2306,8 +2306,8 @@ wait_for_confirms(Unconfirmed) -> true -> ok; false -> receive {'$gen_cast', {confirm, Confirmed, _}} -> wait_for_confirms( - gb_sets:difference(Unconfirmed, - gb_sets:from_list(Confirmed))) + rabbit_misc:gb_sets_difference( + Unconfirmed, gb_sets:from_list(Confirmed))) after 5000 -> exit(timeout_waiting_for_confirm) end end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 46f6d6c1..f9315c5d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -351,7 +351,7 @@ durable :: boolean(), transient_threshold :: non_neg_integer(), - async_callback :: async_callback(), + async_callback :: rabbit_backing_queue:async_callback(), len :: non_neg_integer(), persistent_count :: non_neg_integer(), @@ -370,8 +370,6 @@ ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). --include("rabbit_backing_queue_spec.hrl"). - -spec(multiple_routing_keys/0 :: () -> 'ok'). -endif. @@ -591,10 +589,10 @@ dropwhile(Pred, MsgFun, State) -> {_, State2} = internal_fetch(false, MsgStatus, State1), dropwhile(Pred, MsgFun, State2); {true, _} -> - {{_, _, AckTag, _}, State2} = - internal_fetch(true, MsgStatus, State1), - {MsgStatus, State3} = read_msg(MsgStatus, State2), - MsgFun(MsgStatus#msg_status.msg, AckTag), + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + {{Msg, _, AckTag, _}, State3} = + internal_fetch(true, MsgStatus1, State2), + MsgFun(Msg, AckTag), dropwhile(Pred, MsgFun, State3); {false, _} -> a(in_r(MsgStatus, State1)) @@ -1291,10 +1289,11 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC, confirmed = C }) -> - State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet), - msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet), - unconfirmed = gb_sets:difference(UC, MsgIdSet), - confirmed = gb_sets:union (C, MsgIdSet) }. + State #vqstate { + msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet), + msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet), + unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet), + confirmed = gb_sets:union(C, MsgIdSet) }. must_sync_index(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 8dd8aba8..f1b74878 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -81,8 +81,6 @@ which_children/1, find_child/2, check_childspecs/1]). --export([behaviour_info/1]). - %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). @@ -112,11 +110,144 @@ -define(is_terminate_simple(State), State#state.strategy =:= simple_one_for_one_terminate). +-ifdef(use_specs). + +%%-------------------------------------------------------------------------- +%% Types +%%-------------------------------------------------------------------------- + +-export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]). + +-type child() :: 'undefined' | pid(). +-type child_id() :: term(). +-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. +-type modules() :: [module()] | 'dynamic'. +-type delay() :: non_neg_integer(). +-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' + | {'permanent', delay()} | {'transient', delay()} + | {'intrinsic', delay()}. +-type shutdown() :: 'brutal_kill' | timeout(). +-type worker() :: 'worker' | 'supervisor'. +-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. +-type sup_ref() :: (Name :: atom()) + | {Name :: atom(), Node :: node()} + | {'global', Name :: atom()} + | pid(). +-type child_spec() :: {Id :: child_id(), + StartFunc :: mfargs(), + Restart :: restart(), + Shutdown :: shutdown(), + Type :: worker(), + Modules :: modules()}. + + +-type strategy() :: 'one_for_all' | 'one_for_one' + | 'rest_for_one' | 'simple_one_for_one' + | 'simple_one_for_one_terminate'. + +-type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()], + name :: child_id(), + mfa :: mfargs(), + restart_type :: restart(), + shutdown :: shutdown(), + child_type :: worker(), + modules :: modules()}. + +-type state() :: #state{strategy :: strategy(), + children :: [child_rec()], + dynamics :: ?DICT(), + intensity :: non_neg_integer(), + period :: pos_integer()}. + +%%-------------------------------------------------------------------------- +%% Callback behaviour +%%-------------------------------------------------------------------------- + +-callback init(Args :: term()) -> + {ok, {{RestartStrategy :: strategy(), + MaxR :: non_neg_integer(), + MaxT :: non_neg_integer()}, + [ChildSpec :: child_spec()]}} + | ignore. + +%%-------------------------------------------------------------------------- +%% Specs +%%-------------------------------------------------------------------------- + +-type startchild_err() :: 'already_present' + | {'already_started', Child :: child()} | term(). +-type startchild_ret() :: {'ok', Child :: child()} + | {'ok', Child :: child(), Info :: term()} + | {'error', startchild_err()}. + +-spec start_child(SupRef, ChildSpec) -> startchild_ret() when + SupRef :: sup_ref(), + ChildSpec :: child_spec() | (List :: [term()]). + +-spec restart_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: child_id(), + Result :: {'ok', Child :: child()} + | {'ok', Child :: child(), Info :: term()} + | {'error', Error}, + Error :: 'running' | 'not_found' | 'simple_one_for_one' | term(). + +-spec delete_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'running' | 'not_found' | 'simple_one_for_one'. + +-spec terminate_child(SupRef, Id) -> Result when + SupRef :: sup_ref(), + Id :: pid() | child_id(), + Result :: 'ok' | {'error', Error}, + Error :: 'not_found' | 'simple_one_for_one'. + +-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when + SupRef :: sup_ref(), + Id :: child_id() | 'undefined', + Child :: child(), + Type :: worker(), + Modules :: modules(). + +-spec check_childspecs(ChildSpecs) -> Result when + ChildSpecs :: [child_spec()], + Result :: 'ok' | {'error', Error :: term()}. + +-type init_sup_name() :: sup_name() | 'self'. + +-type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}} + | {'bad_start_spec', term()} | {'start_spec', term()} + | {'supervisor_data', term()}. + +-spec init({init_sup_name(), module(), [term()]}) -> + {'ok', state()} | 'ignore' | {'stop', stop_rsn()}. + +-type call() :: 'which_children'. +-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. + +-spec handle_cast('null', state()) -> {'noreply', state()}. + +-spec handle_info(term(), state()) -> + {'noreply', state()} | {'stop', 'shutdown', state()}. + +-spec terminate(term(), state()) -> 'ok'. + +-spec code_change(term(), state(), term()) -> + {'ok', state()} | {'error', term()}. + +-else. + +-export([behaviour_info/1]). + behaviour_info(callbacks) -> [{init,1}]; behaviour_info(_Other) -> undefined. +-endif. + %%% --------------------------------------------------- %%% This is a general process supervisor built upon gen_server.erl. %%% Servers/processes should/could also be built using gen_server.erl. |