summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-04-19 13:10:35 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-04-19 13:10:35 +0100
commit423e12d4f72380e60ec0dac06b809fd29c9f41d0 (patch)
tree16fc4968f090baced5fc8878ed5b1b15fe88e57f
parentedf4bcfe0ecddb6232a3830ba328053c0d4436e1 (diff)
parent0ecafa1504539aa01798c05e35dab7ec0a354350 (diff)
downloadrabbitmq-server-423e12d4f72380e60ec0dac06b809fd29c9f41d0.tar.gz
Merged bug24855 into default
-rw-r--r--Makefile6
-rw-r--r--include/rabbit_auth_backend_spec.hrl31
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl28
-rw-r--r--include/rabbit_backing_queue_spec.hrl71
-rw-r--r--include/rabbit_exchange_type_spec.hrl38
-rw-r--r--include/rabbit_msg_store_index.hrl45
-rw-r--r--src/dtree.erl110
-rw-r--r--src/gen_server2.erl65
-rw-r--r--src/gm.erl103
-rw-r--r--src/mirrored_supervisor.erl76
-rw-r--r--src/pmon.erl64
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl197
-rw-r--r--src/rabbit_auth_backend.erl83
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl56
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl2
-rw-r--r--src/rabbit_backing_queue.erl348
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_basic.erl9
-rw-r--r--src/rabbit_channel.erl40
-rw-r--r--src/rabbit_exchange.erl5
-rw-r--r--src/rabbit_exchange_type.erl66
-rw-r--r--src/rabbit_exchange_type_direct.erl1
-rw-r--r--src/rabbit_exchange_type_fanout.erl1
-rw-r--r--src/rabbit_exchange_type_headers.erl1
-rw-r--r--src/rabbit_exchange_type_invalid.erl1
-rw-r--r--src/rabbit_exchange_type_topic.erl1
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl13
-rw-r--r--src/rabbit_mirror_queue_slave.erl53
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_msg_store.erl3
-rw-r--r--src/rabbit_msg_store_ets_index.erl4
-rw-r--r--src/rabbit_msg_store_index.erl27
-rw-r--r--src/rabbit_networking.erl11
-rw-r--r--src/rabbit_queue_collector.erl36
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl21
-rw-r--r--src/supervisor2.erl135
42 files changed, 986 insertions, 833 deletions
diff --git a/Makefile b/Makefile
index d16cd4d0..db7462a6 100644
--- a/Makefile
+++ b/Makefile
@@ -42,9 +42,9 @@ BASIC_PLT=basic.plt
RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
-# our type specs rely on features and bug fixes in dialyzer that are
-# only available in R14B03 upwards (R14B03 is erts 5.8.4)
-USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,8,4]), halt().')
+# our type specs rely on callback specs, which are available in R15B
+# upwards.
+USE_SPECS:=$(shell erl -noshell -eval 'io:format([list_to_integer(X) || X <- string:tokens(erlang:system_info(version), ".")] >= [5,9]), halt().')
endif
ifndef USE_PROPER_QC
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
deleted file mode 100644
index 61a2e22a..00000000
--- a/include/rabbit_auth_backend_spec.hrl
+++ /dev/null
@@ -1,31 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
-
--spec(check_user_login/2 :: (rabbit_types:username(), [term()]) ->
- {'ok', rabbit_types:user()} |
- {'refused', string(), [any()]} |
- {'error', any()}).
--spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) ->
- boolean() | {'error', any()}).
--spec(check_resource_access/3 :: (rabbit_types:user(),
- rabbit_types:r(atom()),
- rabbit_access_control:permission_atom()) ->
- boolean() | {'error', any()}).
--endif.
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
deleted file mode 100644
index 9a2f5e05..00000000
--- a/include/rabbit_auth_mechanism_spec.hrl
+++ /dev/null
@@ -1,28 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(should_offer/1 :: (rabbit_net:socket()) -> boolean()).
--spec(init/1 :: (rabbit_net:socket()) -> any()).
--spec(handle_response/2 :: (binary(), any()) ->
- {'ok', rabbit_types:user()} |
- {'challenge', binary(), any()} |
- {'protocol_error', string(), [any()]} |
- {'refused', string(), [any()]}).
-
--endif.
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
deleted file mode 100644
index 79d44e1b..00000000
--- a/include/rabbit_backing_queue_spec.hrl
+++ /dev/null
@@ -1,71 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
--type(attempt_recovery() :: boolean()).
--type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
--type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
--type(duration() :: ('undefined' | 'infinity' | number())).
-
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
-
--spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
--spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
- async_callback()) -> state()).
--spec(terminate/2 :: (any(), state()) -> state()).
--spec(delete_and_terminate/2 :: (any(), state()) -> state()).
--spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/4 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state()).
--spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}).
--spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/3 ::
- (fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state()).
--spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
- (false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()).
--spec(requeue/2 :: ([ack()], state())
- -> {[rabbit_guid:guid()], state()}).
--spec(len/1 :: (state()) -> non_neg_integer()).
--spec(is_empty/1 :: (state()) -> boolean()).
--spec(set_ram_duration_target/2 ::
- (duration(), state()) -> state()).
--spec(ram_duration/1 :: (state()) -> {duration(), state()}).
--spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle').
--spec(timeout/1 :: (state()) -> state()).
--spec(handle_pre_hibernate/1 :: (state()) -> state()).
--spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/2 ::
- (rabbit_types:basic_message(), state()) ->
- {'false'|'published'|'discarded', state()}).
--spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
deleted file mode 100644
index 8f7e22d3..00000000
--- a/include/rabbit_exchange_type_spec.hrl
+++ /dev/null
@@ -1,38 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--type(tx() :: 'transaction' | 'none').
--type(serial() :: pos_integer() | tx()).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(serialise_events/0 :: () -> boolean()).
--spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> rabbit_router:match_result()).
--spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok').
--spec(delete/3 :: (tx(), rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
--spec(add_binding/3 :: (serial(), rabbit_types:exchange(),
- rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
--spec(assert_args_equivalence/2 ::
- (rabbit_types:exchange(), rabbit_framing:amqp_table())
- -> 'ok' | rabbit_types:connection_exit()).
-
--endif.
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
deleted file mode 100644
index 75d7eb71..00000000
--- a/include/rabbit_msg_store_index.hrl
+++ /dev/null
@@ -1,45 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--include("rabbit_msg_store.hrl").
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(dir() :: any()).
--type(index_state() :: any()).
--type(keyvalue() :: any()).
--type(fieldpos() :: non_neg_integer()).
--type(fieldvalue() :: any()).
-
--spec(new/1 :: (dir()) -> index_state()).
--spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())).
--spec(lookup/2 ::
- (rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue())).
--spec(insert/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update_fields/3 :: (rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
- [{fieldpos(), fieldvalue()}]),
- index_state()) -> 'ok').
--spec(delete/2 :: (rabbit_types:msg_id(), index_state()) -> 'ok').
--spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
--spec(terminate/1 :: (index_state()) -> any()).
-
--endif.
-
-%%----------------------------------------------------------------------------
diff --git a/src/dtree.erl b/src/dtree.erl
index 265bb340..ca2d30cf 100644
--- a/src/dtree.erl
+++ b/src/dtree.erl
@@ -16,23 +16,23 @@
%% A dual-index tree.
%%
-%% Conceptually, what we want is a map that has two distinct sets of
-%% keys (referred to here as primary and secondary, although that
-%% shouldn't imply a hierarchy) pointing to one set of
-%% values. However, in practice what we'll always want to do is insert
-%% a value that's pointed at by (one primary, many secondaries) and
-%% remove values that are pointed at by (one secondary, many
-%% primaries) or (one secondary, all primaries). Thus the API.
+%% Entries have the following shape:
%%
-%% Entries exists while they have a non-empty secondary key set. The
-%% 'take' operations return the entries that got removed, i.e. that
-%% had no remaining secondary keys. take/3 expects entries to exist
-%% with the supplied primary keys and secondary key. take/2 can cope
-%% with the supplied secondary key having no entries.
+%% +----+--------------------+---+
+%% | PK | SK1, SK2, ..., SKN | V |
+%% +----+--------------------+---+
+%%
+%% i.e. a primary key, set of secondary keys, and a value.
+%%
+%% There can be only one entry per primary key, but secondary keys may
+%% appear in multiple entries.
+%%
+%% The set of secondary keys must be non-empty. Or, to put it another
+%% way, entries only exist while their secondary key set is non-empty.
-module(dtree).
--export([empty/0, insert/4, take/3, take/2,
+-export([empty/0, insert/4, take/3, take/2, take_all/2,
is_defined/2, is_empty/1, smallest/1, size/1]).
%%----------------------------------------------------------------------------
@@ -52,6 +52,7 @@
-spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()).
-spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
+-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}).
-spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()).
-spec(is_empty/1 :: (?MODULE()) -> boolean()).
-spec(smallest/1 :: (?MODULE()) -> kv()).
@@ -63,6 +64,12 @@
empty() -> {gb_trees:empty(), gb_trees:empty()}.
+%% Insert an entry. Fails if there already is an entry with the given
+%% primary key.
+insert(PK, [], V, {P, S}) ->
+ %% dummy insert to force error if PK exists
+ gb_trees:insert(PK, {gb_sets:empty(), V}, P),
+ {P, S};
insert(PK, SKs, V, {P, S}) ->
{gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P),
lists:foldl(fun (SK, S0) ->
@@ -74,21 +81,45 @@ insert(PK, SKs, V, {P, S}) ->
end
end, S, SKs)}.
+%% Remove the given secondary key from the entries of the given
+%% primary keys, returning the primary-key/value pairs of any entries
+%% that were dropped as the result (i.e. due to their secondary key
+%% set becoming empty). It is ok for the given primary keys and/or
+%% secondary key to not exist.
take(PKs, SK, {P, S}) ->
- {KVs, P1} = take2(PKs, SK, P),
- PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)),
- {KVs, {P1, case gb_sets:is_empty(PKS) of
- true -> gb_trees:delete(SK, S);
- false -> gb_trees:update(SK, PKS, S)
- end}}.
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> TakenPKS = gb_sets:from_list(PKs),
+ PKSInter = gb_sets:intersection(PKS, TakenPKS),
+ PKSDiff = gb_sets_difference (PKS, PKSInter),
+ {KVs, P1} = take2(PKSInter, SK, P),
+ {KVs, {P1, case gb_sets:is_empty(PKSDiff) of
+ true -> gb_trees:delete(SK, S);
+ false -> gb_trees:update(SK, PKSDiff, S)
+ end}}
+ end.
+%% Remove the given secondary key from all entries, returning the
+%% primary-key/value pairs of any entries that were dropped as the
+%% result (i.e. due to their secondary key set becoming empty). It is
+%% ok for the given secondary key to not exist.
take(SK, {P, S}) ->
case gb_trees:lookup(SK, S) of
none -> {[], {P, S}};
- {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P),
+ {value, PKS} -> {KVs, P1} = take2(PKS, SK, P),
{KVs, {P1, gb_trees:delete(SK, S)}}
end.
+%% Drop all entries which contain the given secondary key, returning
+%% the primary-key/value pairs of these entries. It is ok for the
+%% given secondary key to not exist.
+take_all(SK, {P, S}) ->
+ case gb_trees:lookup(SK, S) of
+ none -> {[], {P, S}};
+ {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P),
+ {KVs, {P1, prune(SKS, PKS, S)}}
+ end.
+
is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S).
is_empty({P, _S}) -> gb_trees:is_empty(P).
@@ -100,12 +131,33 @@ size({P, _S}) -> gb_trees:size(P).
%%----------------------------------------------------------------------------
-take2(PKs, SK, P) ->
- lists:foldl(fun (PK, {KVs, P0}) ->
- {SKS, V} = gb_trees:get(PK, P0),
- SKS1 = gb_sets:delete(SK, SKS),
- case gb_sets:is_empty(SKS1) of
- true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)};
- false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
- end
- end, {[], P}, PKs).
+take2(PKS, SK, P) ->
+ gb_sets:fold(fun (PK, {KVs, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ SKS1 = gb_sets:delete(SK, SKS),
+ case gb_sets:is_empty(SKS1) of
+ true -> KVs1 = [{PK, V} | KVs],
+ {KVs1, gb_trees:delete(PK, P0)};
+ false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)}
+ end
+ end, {[], P}, PKS).
+
+take_all2(PKS, P) ->
+ gb_sets:fold(fun (PK, {KVs, SKS0, P0}) ->
+ {SKS, V} = gb_trees:get(PK, P0),
+ {[{PK, V} | KVs], gb_sets:union(SKS, SKS0),
+ gb_trees:delete(PK, P0)}
+ end, {[], gb_sets:empty(), P}, PKS).
+
+prune(SKS, PKS, S) ->
+ gb_sets:fold(fun (SK0, S0) ->
+ PKS1 = gb_trees:get(SK0, S0),
+ PKS2 = gb_sets_difference(PKS1, PKS),
+ case gb_sets:is_empty(PKS2) of
+ true -> gb_trees:delete(SK0, S0);
+ false -> gb_trees:update(SK0, PKS2, S0)
+ end
+ end, S, SKS).
+
+gb_sets_difference(S1, S2) ->
+ gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index f8537487..78bbbe06 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -31,13 +31,13 @@
%% handle_pre_hibernate/1 then the default action is to hibernate.
%%
%% 6) init can return a 4th arg, {backoff, InitialTimeout,
-%% MinimumTimeout, DesiredHibernatePeriod} (all in
-%% milliseconds). Then, on all callbacks which can return a timeout
-%% (including init), timeout can be 'hibernate'. When this is the
-%% case, the current timeout value will be used (initially, the
-%% InitialTimeout supplied from init). After this timeout has
-%% occurred, hibernation will occur as normal. Upon awaking, a new
-%% current timeout value will be calculated.
+%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds,
+%% 'infinity' does not make sense here). Then, on all callbacks which
+%% can return a timeout (including init), timeout can be
+%% 'hibernate'. When this is the case, the current timeout value will
+%% be used (initially, the InitialTimeout supplied from init). After
+%% this timeout has occurred, hibernation will occur as normal. Upon
+%% awaking, a new current timeout value will be calculated.
%%
%% The purpose is that the gen_server2 takes care of adjusting the
%% current timeout value such that the process will increase the
@@ -135,9 +135,10 @@
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
+%%% Reason = normal | shutdown | {shutdown, Term} | Term
%%% always called when server terminates
%%%
-%%% ==> ok
+%%% ==> ok | Term
%%%
%%% handle_pre_hibernate(State)
%%%
@@ -182,8 +183,6 @@
multi_call/2, multi_call/3, multi_call/4,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
--export([behaviour_info/1]).
-
%% System exports
-export([system_continue/3,
system_terminate/4,
@@ -200,12 +199,12 @@
timeout_state, queue, debug, prioritise_call,
prioritise_cast, prioritise_info}).
+-ifdef(use_specs).
+
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
%%%=========================================================================
--ifdef(use_specs).
-
-type(gs2_state() :: #gs2_state{}).
-spec(handle_common_termination/3 ::
@@ -214,18 +213,58 @@
-spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
-spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
--endif.
+-type(millis() :: non_neg_integer()).
%%%=========================================================================
%%% API
%%%=========================================================================
+-callback init(Args :: term()) ->
+ {ok, State :: term()} |
+ {ok, State :: term(), timeout() | hibernate} |
+ {ok, State :: term(), timeout() | hibernate,
+ {backoff, millis(), millis(), millis()}} |
+ ignore |
+ {stop, Reason :: term()}.
+-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: term()) ->
+ {reply, Reply :: term(), NewState :: term()} |
+ {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(),
+ Reply :: term(), NewState :: term()}.
+-callback handle_cast(Request :: term(), State :: term()) ->
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: term()}.
+-callback handle_info(Info :: term(), State :: term()) ->
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: term()}.
+-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: term()) ->
+ ok | term().
+-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
+ Extra :: term()) ->
+ {ok, NewState :: term()} | {error, Reason :: term()}.
+
+%% It's not possible to define "optional" -callbacks, so putting specs
+%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result
+%% in warnings (the same applied for the behaviour_info before).
+
+-else.
+
+-export([behaviour_info/1]).
+
behaviour_info(callbacks) ->
[{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
{terminate,2},{code_change,3}];
behaviour_info(_Other) ->
undefined.
+-endif.
+
%%% -----------------------------------------------------------------
%%% Starts a generic server.
%%% start(Mod, Args, Options)
diff --git a/src/gm.erl b/src/gm.erl
index 6f9ff564..01300f18 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -57,8 +57,8 @@
%% you wish to be passed into the callback module's functions. The
%% joined/2 function will be called when we have joined the group,
%% with the arguments passed to start_link and a list of the current
-%% members of the group. See the comments in behaviour_info/1 below
-%% for further details of the callback functions.
+%% members of the group. See the callbacks specs and the comments
+%% below for further details of the callback functions.
%%
%% leave/1
%% Provide the Pid. Removes the Pid from the group. The callback
@@ -378,7 +378,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
+-ifndef(use_specs).
-export([behaviour_info/1]).
+-endif.
-export([table_definitions/0]).
@@ -431,56 +433,63 @@
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(group_members/1 :: (pid()) -> [pid()]).
--endif.
+%% The joined, members_changed and handle_msg callbacks can all
+%% return any of the following terms:
+%%
+%% 'ok' - the callback function returns normally
+%%
+%% {'stop', Reason} - the callback indicates the member should
+%% stop with reason Reason and should leave the group.
+%%
+%% {'become', Module, Args} - the callback indicates that the
+%% callback module should be changed to Module and that the
+%% callback functions should now be passed the arguments
+%% Args. This allows the callback module to be dynamically
+%% changed.
+
+%% Called when we've successfully joined the group. Supplied with
+%% Args provided in start_link, plus current group members.
+-callback joined(Args :: term(), Members :: [pid()]) ->
+ ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
+
+%% Supplied with Args provided in start_link, the list of new
+%% members and the list of members previously known to us that
+%% have since died. Note that if a member joins and dies very
+%% quickly, it's possible that we will never see that member
+%% appear in either births or deaths. However we are guaranteed
+%% that (1) we will see a member joining either in the births
+%% here, or in the members passed to joined/2 before receiving
+%% any messages from it; and (2) we will not see members die that
+%% we have not seen born (or supplied in the members to
+%% joined/2).
+-callback members_changed(Args :: term(), Births :: [pid()],
+ Deaths :: [pid()]) ->
+ ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
+
+%% Supplied with Args provided in start_link, the sender, and the
+%% message. This does get called for messages injected by this
+%% member, however, in such cases, there is no special
+%% significance of this invocation: it does not indicate that the
+%% message has made it to any other members, let alone all other
+%% members.
+-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
+ ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
+
+%% Called on gm member termination as per rules in gen_server,
+%% with the Args provided in start_link plus the termination
+%% Reason.
+-callback terminate(Args :: term(), Reason :: term()) ->
+ ok | term().
+
+-else.
behaviour_info(callbacks) ->
- [
- %% The joined, members_changed and handle_msg callbacks can all
- %% return any of the following terms:
- %%
- %% 'ok' - the callback function returns normally
- %%
- %% {'stop', Reason} - the callback indicates the member should
- %% stop with reason Reason and should leave the group.
- %%
- %% {'become', Module, Args} - the callback indicates that the
- %% callback module should be changed to Module and that the
- %% callback functions should now be passed the arguments
- %% Args. This allows the callback module to be dynamically
- %% changed.
-
- %% Called when we've successfully joined the group. Supplied with
- %% Args provided in start_link, plus current group members.
- {joined, 2},
-
- %% Supplied with Args provided in start_link, the list of new
- %% members and the list of members previously known to us that
- %% have since died. Note that if a member joins and dies very
- %% quickly, it's possible that we will never see that member
- %% appear in either births or deaths. However we are guaranteed
- %% that (1) we will see a member joining either in the births
- %% here, or in the members passed to joined/2 before receiving
- %% any messages from it; and (2) we will not see members die that
- %% we have not seen born (or supplied in the members to
- %% joined/2).
- {members_changed, 3},
-
- %% Supplied with Args provided in start_link, the sender, and the
- %% message. This does get called for messages injected by this
- %% member, however, in such cases, there is no special
- %% significance of this invocation: it does not indicate that the
- %% message has made it to any other members, let alone all other
- %% members.
- {handle_msg, 3},
-
- %% Called on gm member termination as per rules in gen_server,
- %% with the Args provided in start_link plus the termination
- %% Reason.
- {terminate, 2}
- ];
+ [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
behaviour_info(_Other) ->
undefined.
+-endif.
+
create_tables() ->
create_tables([?TABLE]).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index a599effa..221f6a87 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -120,8 +120,6 @@
delete_child/2, terminate_child/2,
which_children/1, count_children/1, check_childspecs/1]).
--export([behaviour_info/1]).
-
-behaviour(?GEN_SERVER).
-behaviour(?SUPERVISOR).
@@ -142,15 +140,20 @@
-ifdef(use_specs).
--type child() :: pid() | 'undefined'.
--type child_id() :: term().
--type modules() :: [module()] | 'dynamic'.
--type worker() :: 'worker' | 'supervisor'.
--type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
--type sup_ref() :: (Name :: atom())
- | {Name :: atom(), Node :: node()}
- | {'global', Name :: atom()}
- | pid().
+%%--------------------------------------------------------------------------
+%% Callback behaviour
+%%--------------------------------------------------------------------------
+
+-callback init(Args :: term()) ->
+ {ok, {{RestartStrategy :: supervisor2:strategy(),
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
+ [ChildSpec :: supervisor2:child_spec()]}}
+ | ignore.
+
+%%--------------------------------------------------------------------------
+%% Specs
+%%--------------------------------------------------------------------------
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
@@ -163,54 +166,26 @@
Args :: term().
-spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when
- SupName :: sup_name(),
+ SupName :: supervisor2:sup_name(),
GroupName :: group_name(),
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
- SupRef :: sup_ref(),
- ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-
--spec restart_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: child_id(),
- Result :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one' | term().
-
--spec delete_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one'.
-
--spec terminate_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: pid() | child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'not_found' | 'simple_one_for_one'.
-
--spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
- SupRef :: sup_ref(),
- Id :: child_id() | 'undefined',
- Child :: child(),
- Type :: worker(),
- Modules :: modules().
-
--spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [supervisor:child_spec()],
- Result :: 'ok' | {'error', Error :: term()}.
-
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [supervisor:child_spec()],
- Result :: startlink_ret().
+ ChildSpecs :: [supervisor2:child_spec()],
+ Result :: supervisor2:startlink_ret().
-spec create_tables() -> Result when
Result :: 'ok'.
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) -> [{init,1}];
+behaviour_info(_Other) -> undefined.
+
-endif.
%%----------------------------------------------------------------------------
@@ -250,9 +225,6 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-behaviour_info(callbacks) -> [{init,1}];
-behaviour_info(_Other) -> undefined.
-
call(Sup, Msg) ->
?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
diff --git a/src/pmon.erl b/src/pmon.erl
new file mode 100644
index 00000000..45786577
--- /dev/null
+++ b/src/pmon.erl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(pmon).
+
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
+
+-ifdef(use_specs).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: dict()).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+
+-endif.
+
+new() -> dict:new().
+
+monitor(Pid, M) ->
+ case dict:is_key(Pid, M) of
+ true -> M;
+ false -> dict:store(Pid, erlang:monitor(process, Pid), M)
+ end.
+
+monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids).
+
+demonitor(Pid, M) ->
+ case dict:find(Pid, M) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ dict:erase(Pid, M);
+ error -> M
+ end.
+
+is_monitored(Pid, M) -> dict:is_key(Pid, M).
+
+erase(Pid, M) -> dict:erase(Pid, M).
+
+monitored(M) -> dict:fetch_keys(M).
+
+is_empty(M) -> dict:size(M) == 0.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ecbcbc3..c1673504 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -109,7 +109,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_immediately/1 :: (qpids()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
{<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
@@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) ->
check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_positive_int_arg({Type, Val}, _Args) ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
- false -> {error, {unacceptable_type, Type}};
- true when Val =< 0 -> {error, {value_zero_or_less, Val}};
- true -> ok
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.
+
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
@@ -455,8 +468,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-delete_immediately(#amqqueue{ pid = QPid }) ->
- gen_server2:cast(QPid, delete_immediately).
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
+ ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 75b92f1f..2063e557 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/8]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ senders,
publish_seqno,
unconfirmed,
delayed_stop,
@@ -74,9 +75,9 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/7 ::
+-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -131,18 +132,19 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, MTC) ->
+ RateTRef, AckTags, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -538,17 +541,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -597,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- case get({ch_publisher, DownPid}) of
- undefined -> ok;
- MRef -> erlang:demonitor(MRef),
- erase({ch_publisher, DownPid}),
- credit_flow:peer_down(DownPid)
- end,
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ senders = Senders}) ->
+ Senders1 = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end,
case lookup_ch(DownPid) of
not_found ->
- {ok, State};
+ {ok, State#q{senders = Senders1}};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
@@ -618,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers)},
+ ChPid, State#q.active_consumers),
+ senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -719,57 +731,51 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
State)
end.
+dead_letter_publish(Msg, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ dlx = DLX}) ->
+ Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo),
+ {ok, X} = rabbit_exchange:lookup(XName),
+ Queues = rabbit_exchange:route(X, Delivery),
+ {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues1),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
+
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
- {ok, _, QPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
- State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ unconfirmed = UC}) ->
+ QPids = dead_letter_publish(Msg, Reason, State),
+ State1 = State#q{queue_monitors = pmon:monitor_all(
+ QPids, State#q.queue_monitors),
+ publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> cleanup_after_confirm([AckTag], State2);
+ [] -> cleanup_after_confirm([AckTag], State1);
_ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State2#q{unconfirmed = UC1})
- end.
-
-monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> State;
- false -> State#q{queue_monitors =
- dict:store(QPid, erlang:monitor(process, QPid),
- QMons)}
- end.
-
-demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:find(QPid, QMons) of
- {ok, MRef} -> erlang:demonitor(MRef),
- State#q{queue_monitors = dict:erase(QPid, QMons)};
- error -> State
+ noreply(State1#q{unconfirmed = UC1})
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
- case dict:find(QPid, QMons) of
- error ->
- noreply(State);
- {ok, _} ->
- rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- case (MsgSeqNoAckTags =/= [] andalso
- rabbit_misc:is_abnormal_termination(Reason)) of
- true -> rabbit_log:warning("Dead queue lost ~p messages~n",
- [length(MsgSeqNoAckTags)]);
- false -> ok
- end,
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = dict:erase(QPid, QMons),
- unconfirmed = UC1})
+ case pmon:is_monitored(QPid, QMons) of
+ false -> noreply(State);
+ true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ QNameS = rabbit_misc:rs(qname(State)),
+ rabbit_log:warning("DLQ ~p for ~s died with "
+ "~p unconfirmed messages~n",
+ [QPid, QNameS, length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = pmon:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
stop_later(Reason, State) ->
@@ -801,28 +807,31 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-already_been_here(_Delivery, #q{dlx = undefined}) ->
- false;
-already_been_here(#delivery{message = #basic_message{content = Content}},
- State) ->
+detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
+ Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
- #resource{name = QueueName} = qname(State),
+ NoCycles = {Queues, []},
case Headers of
undefined ->
- false;
+ NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
{array, DeathTables} ->
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
{table, D} <- DeathTables],
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- case lists:member(QueueName, OldQueues1) of
- true -> [QueueName | OldQueues1];
- _ -> false
- end;
+ OldQueuesSet = ordsets:from_list(OldQueues1),
+ {Cycling, NotCycling} =
+ lists:partition(
+ fun(Queue) ->
+ ordsets:is_element(Queue#resource.name,
+ OldQueuesSet)
+ end, Queues),
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
_ ->
- false
+ NoCycles
end
end.
@@ -1187,7 +1196,8 @@ handle_call(force_event_refresh, _From,
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
{MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
State1 = case dtree:is_defined(QPid, UC1) of
- false -> demonitor_queue(QPid, State);
+ false -> QMons = State#q.queue_monitors,
+ State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
true -> State
end,
cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
@@ -1199,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
- State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+ State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- case already_been_here(Delivery, State) of
- false -> noreply(deliver_or_enqueue(Delivery, State));
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
- end;
+ Senders1 = case Flow of
+ flow -> credit_flow:ack(Sender),
+ pmon:monitor(Sender, Senders);
+ noflow -> Senders
+ end,
+ State1 = State#q{senders = Senders1},
+ noreply(deliver_or_enqueue(Delivery, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index e0e252b8..e89951e7 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -16,42 +16,57 @@
-module(rabbit_auth_backend).
+-ifdef(use_specs).
+
+%% A description proplist as with auth mechanisms,
+%% exchanges. Currently unused.
+-callback description() -> [proplist:property()].
+
+%% Check a user can log in, given a username and a proplist of
+%% authentication information (e.g. [{password, Password}]).
+%%
+%% Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {error, Error}
+%% Something went wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback check_user_login(rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}.
+
+%% Given #user and vhost, can a user log in to a vhost?
+%% Possible responses:
+%% true
+%% false
+%% {error, Error}
+%% Something went wrong. Log and die.
+-callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) ->
+ boolean() | {'error', any()}.
+
+
+%% Given #user, resource and permission, can a user access a resource?
+%%
+%% Possible responses:
+%% true
+%% false
+%% {error, Error}
+%% Something went wrong. Log and die.
+-callback check_resource_access(rabbit_types:user(),
+ rabbit_types:r(atom()),
+ rabbit_access_control:permission_atom()) ->
+ boolean() | {'error', any()}.
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% A description proplist as with auth mechanisms,
- %% exchanges. Currently unused.
- {description, 0},
-
- %% Check a user can log in, given a username and a proplist of
- %% authentication information (e.g. [{password, Password}]).
- %%
- %% Possible responses:
- %% {ok, User}
- %% Authentication succeeded, and here's the user record.
- %% {error, Error}
- %% Something went wrong. Log and die.
- %% {refused, Msg, Args}
- %% Client failed authentication. Log and die.
- {check_user_login, 2},
-
- %% Given #user and vhost, can a user log in to a vhost?
- %% Possible responses:
- %% true
- %% false
- %% {error, Error}
- %% Something went wrong. Log and die.
- {check_vhost_access, 2},
-
- %% Given #user, resource and permission, can a user access a resource?
- %%
- %% Possible responses:
- %% true
- %% false
- %% {error, Error}
- %% Something went wrong. Log and die.
- {check_resource_access, 3}
- ];
+ [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2},
+ {check_resource_access, 3}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 3ef81d32..7b9df81e 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -32,8 +32,6 @@
vhost_perms_info_keys/0, user_perms_info_keys/0,
user_vhost_perms_info_keys/0]).
--include("rabbit_auth_backend_spec.hrl").
-
-ifdef(use_specs).
-type(regexp() :: binary()).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 0c8251b8..eda6a743 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -16,31 +16,41 @@
-module(rabbit_auth_mechanism).
+-ifdef(use_specs).
+
+%% A description.
+-callback description() -> [proplist:property()].
+
+%% If this mechanism is enabled, should it be offered for a given socket?
+%% (primarily so EXTERNAL can be SSL-only)
+-callback should_offer(rabbit_net:socket()) -> boolean().
+
+%% Called before authentication starts. Should create a state
+%% object to be passed through all the stages of authentication.
+-callback init(rabbit_net:socket()) -> any().
+
+%% Handle a stage of authentication. Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {challenge, Challenge, NextState}
+%% Another round is needed. Here's the state I want next time.
+%% {protocol_error, Msg, Args}
+%% Client got the protocol wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback handle_response(binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', string(), [any()]}.
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% A description.
- {description, 0},
-
- %% If this mechanism is enabled, should it be offered for a given socket?
- %% (primarily so EXTERNAL can be SSL-only)
- {should_offer, 1},
-
- %% Called before authentication starts. Should create a state
- %% object to be passed through all the stages of authentication.
- {init, 1},
-
- %% Handle a stage of authentication. Possible responses:
- %% {ok, User}
- %% Authentication succeeded, and here's the user record.
- %% {challenge, Challenge, NextState}
- %% Another round is needed. Here's the state I want next time.
- %% {protocol_error, Msg, Args}
- %% Client got the protocol wrong. Log and die.
- %% {refused, Msg, Args}
- %% Client failed authentication. Log and die.
- {handle_response, 2}
- ];
+ [{description, 0}, {should_offer, 1}, {init, 1}, {handle_response, 2}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 3de6e7a6..c0d86cd1 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -21,8 +21,6 @@
-export([description/0, should_offer/1, init/1, handle_response/2]).
--include("rabbit_auth_mechanism_spec.hrl").
-
-rabbit_boot_step({?MODULE,
[{description, "auth mechanism amqplain"},
{mfa, {rabbit_registry, register,
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index 64b01d8e..5df1d5d7 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -21,8 +21,6 @@
-export([description/0, should_offer/1, init/1, handle_response/2]).
--include("rabbit_auth_mechanism_spec.hrl").
-
-rabbit_boot_step({?MODULE,
[{description, "auth mechanism cr-demo"},
{mfa, {rabbit_registry, register,
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 19fb5875..423170e1 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -21,8 +21,6 @@
-export([description/0, should_offer/1, init/1, handle_response/2]).
--include("rabbit_auth_mechanism_spec.hrl").
-
-rabbit_boot_step({?MODULE,
[{description, "auth mechanism plain"},
{mfa, {rabbit_registry, register,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 42627aae..6cc1c3fd 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -16,164 +16,200 @@
-module(rabbit_backing_queue).
+-ifdef(use_specs).
+
+%% We can't specify a per-queue ack/state with callback signatures
+-type(ack() :: any()).
+-type(state() :: any()).
+
+-type(fetch_result(Ack) ::
+ ('empty' |
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(confirm_required() :: boolean()).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(duration() :: ('undefined' | 'infinity' | number())).
+
+-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
+ 'undefined').
+
+%% Called on startup with a list of durable queue names. The queues
+%% aren't being started at this point, but this call allows the
+%% backing queue to perform any checking necessary for the consistency
+%% of those queues, or initialise any other shared resources.
+-callback start([rabbit_amqqueue:name()]) -> 'ok'.
+
+%% Called to tear down any state/resources. NB: Implementations should
+%% not depend on this function being called on shutdown and instead
+%% should hook into the rabbit supervision hierarchy.
+-callback stop() -> 'ok'.
+
+%% Initialise the backing queue and its state.
+%%
+%% Takes
+%% 1. the amqqueue record
+%% 2. a boolean indicating whether the queue is an existing queue that
+%% should be recovered
+%% 3. an asynchronous callback which accepts a function of type
+%% backing-queue-state to backing-queue-state. This callback
+%% function can be safely invoked from any process, which makes it
+%% useful for passing messages back into the backing queue,
+%% especially as the backing queue does not have control of its own
+%% mailbox.
+-callback init(rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback()) -> state().
+
+%% Called on queue shutdown when queue isn't being deleted.
+-callback terminate(any(), state()) -> state().
+
+%% Called when the queue is terminating and needs to delete all its
+%% content.
+-callback delete_and_terminate(any(), state()) -> state().
+
+%% Remove all messages in the queue, but not messages which have been
+%% fetched and are pending acks.
+-callback purge(state()) -> {purged_msg_count(), state()}.
+
+%% Publish a message.
+-callback publish(rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state().
+
+%% Called for messages which have already been passed straight
+%% out to a client. The queue will be empty for these calls
+%% (i.e. saves the round trip through the backing queue).
+-callback publish_delivered(true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {ack(), state()};
+ (false, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {undefined, state()}.
+
+%% Return ids of messages which have been confirmed since the last
+%% invocation of this function (or initialisation).
+%%
+%% Message ids should only appear in the result of drain_confirmed
+%% under the following circumstances:
+%%
+%% 1. The message appears in a call to publish_delivered/4 and the
+%% first argument (ack_required) is false; or
+%% 2. The message is fetched from the queue with fetch/2 and the first
+%% argument (ack_required) is false; or
+%% 3. The message is acked (ack/2 is called for the message); or
+%% 4. The message is fully fsync'd to disk in such a way that the
+%% recovery of the message is guaranteed in the event of a crash of
+%% this rabbit node (excluding hardware failure).
+%%
+%% In addition to the above conditions, a message id may only appear
+%% in the result of drain_confirmed if
+%% #message_properties.needs_confirming = true when the msg was
+%% published (through whichever means) to the backing queue.
+%%
+%% It is legal for the same message id to appear in the results of
+%% multiple calls to drain_confirmed, which means that the backing
+%% queue is not required to keep track of which messages it has
+%% already confirmed. The confirm will be issued to the publisher the
+%% first time the message id appears in the result of
+%% drain_confirmed. All subsequent appearances of that message id will
+%% be ignored.
+-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Drop messages from the head of the queue while the supplied
+%% predicate returns true. A callback function is supplied allowing
+%% callers access to messages that are about to be dropped.
+-callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
+ state())
+ -> state().
+
+%% Produce the next message.
+-callback fetch(true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}.
+
+%% Acktags supplied are for messages which can now be forgotten
+%% about. Must return 1 msg_id per Ack, in the same order as Acks.
+-callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Acktags supplied are for messages which should be processed. The
+%% provided callback function is called with each message.
+-callback fold(msg_fun(), state(), [ack()]) -> state().
+
+%% Reinsert messages into the queue which have already been delivered
+%% and were pending acknowledgement.
+-callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% How long is my queue?
+-callback len(state()) -> non_neg_integer().
+
+%% Is my queue empty?
+-callback is_empty(state()) -> boolean().
+
+%% For the next three functions, the assumption is that you're
+%% monitoring something like the ingress and egress rates of the
+%% queue. The RAM duration is thus the length of time represented by
+%% the messages held in RAM given the current rates. If you want to
+%% ignore all of this stuff, then do so, and return 0 in
+%% ram_duration/1.
+
+%% The target is to have no more messages in RAM than indicated by the
+%% duration and the current queue rates.
+-callback set_ram_duration_target(duration(), state()) -> state().
+
+%% Optionally recalculate the duration internally (likely to be just
+%% update your internal rates), and report how many seconds the
+%% messages in RAM represent given the current rates of the queue.
+-callback ram_duration(state()) -> {duration(), state()}.
+
+%% Should 'timeout' be called as soon as the queue process can manage
+%% (either on an empty mailbox, or when a timer fires)?
+-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'.
+
+%% Called (eventually) after needs_timeout returns 'idle' or 'timed'.
+%% Note this may be called more than once for each 'idle' or 'timed'
+%% returned from needs_timeout
+-callback timeout(state()) -> state().
+
+%% Called immediately before the queue hibernates.
+-callback handle_pre_hibernate(state()) -> state().
+
+%% Exists for debugging purposes, to be able to expose state via
+%% rabbitmqctl list_queues backing_queue_status
+-callback status(state()) -> [{atom(), any()}].
+
+%% Passed a function to be invoked with the relevant backing queue's
+%% state. Useful for when the backing queue or other components need
+%% to pass functions into the backing queue.
+-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
+
+%% Called prior to a publish or publish_delivered call. Allows the BQ
+%% to signal that it's already seen this message (and in what capacity
+%% - i.e. was it published previously or discarded previously) and
+%% thus the message should be dropped.
+-callback is_duplicate(rabbit_types:basic_message(), state())
+ -> {'false'|'published'|'discarded', state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this is may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% Called on startup with a list of durable queue names. The
- %% queues aren't being started at this point, but this call
- %% allows the backing queue to perform any checking necessary for
- %% the consistency of those queues, or initialise any other
- %% shared resources.
- {start, 1},
-
- %% Called to tear down any state/resources. NB: Implementations
- %% should not depend on this function being called on shutdown
- %% and instead should hook into the rabbit supervision hierarchy.
- {stop, 0},
-
- %% Initialise the backing queue and its state.
- %%
- %% Takes
- %% 1. the amqqueue record
- %% 2. a boolean indicating whether the queue is an existing queue
- %% that should be recovered
- %% 3. an asynchronous callback which accepts a function of type
- %% backing-queue-state to backing-queue-state. This callback
- %% function can be safely invoked from any process, which
- %% makes it useful for passing messages back into the backing
- %% queue, especially as the backing queue does not have
- %% control of its own mailbox.
- {init, 3},
-
- %% Called on queue shutdown when queue isn't being deleted.
- {terminate, 2},
-
- %% Called when the queue is terminating and needs to delete all
- %% its content.
- {delete_and_terminate, 2},
-
- %% Remove all messages in the queue, but not messages which have
- %% been fetched and are pending acks.
- {purge, 1},
-
- %% Publish a message.
- {publish, 4},
-
- %% Called for messages which have already been passed straight
- %% out to a client. The queue will be empty for these calls
- %% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 5},
-
- %% Return ids of messages which have been confirmed since
- %% the last invocation of this function (or initialisation).
- %%
- %% Message ids should only appear in the result of
- %% drain_confirmed under the following circumstances:
- %%
- %% 1. The message appears in a call to publish_delivered/4 and
- %% the first argument (ack_required) is false; or
- %% 2. The message is fetched from the queue with fetch/2 and the
- %% first argument (ack_required) is false; or
- %% 3. The message is acked (ack/2 is called for the message); or
- %% 4. The message is fully fsync'd to disk in such a way that the
- %% recovery of the message is guaranteed in the event of a
- %% crash of this rabbit node (excluding hardware failure).
- %%
- %% In addition to the above conditions, a message id may only
- %% appear in the result of drain_confirmed if
- %% #message_properties.needs_confirming = true when the msg was
- %% published (through whichever means) to the backing queue.
- %%
- %% It is legal for the same message id to appear in the results
- %% of multiple calls to drain_confirmed, which means that the
- %% backing queue is not required to keep track of which messages
- %% it has already confirmed. The confirm will be issued to the
- %% publisher the first time the message id appears in the result
- %% of drain_confirmed. All subsequent appearances of that message
- %% id will be ignored.
- {drain_confirmed, 1},
-
- %% Drop messages from the head of the queue while the supplied
- %% predicate returns true. A callback function is supplied
- %% allowing callers access to messages that are about to be
- %% dropped.
- {dropwhile, 3},
-
- %% Produce the next message.
- {fetch, 2},
-
- %% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as
- %% Acks.
- {ack, 2},
-
- %% Acktags supplied are for messages which should be
- %% processed. The provided callback function is called with each
- %% message.
- {fold, 3},
-
- %% Reinsert messages into the queue which have already been
- %% delivered and were pending acknowledgement.
- {requeue, 2},
-
- %% How long is my queue?
- {len, 1},
-
- %% Is my queue empty?
- {is_empty, 1},
-
- %% For the next three functions, the assumption is that you're
- %% monitoring something like the ingress and egress rates of the
- %% queue. The RAM duration is thus the length of time represented
- %% by the messages held in RAM given the current rates. If you
- %% want to ignore all of this stuff, then do so, and return 0 in
- %% ram_duration/1.
-
- %% The target is to have no more messages in RAM than indicated
- %% by the duration and the current queue rates.
- {set_ram_duration_target, 2},
-
- %% Optionally recalculate the duration internally (likely to be
- %% just update your internal rates), and report how many seconds
- %% the messages in RAM represent given the current rates of the
- %% queue.
- {ram_duration, 1},
-
- %% Should 'timeout' be called as soon as the queue process
- %% can manage (either on an empty mailbox, or when a timer
- %% fires)?
- {needs_timeout, 1},
-
- %% Called (eventually) after needs_timeout returns 'idle' or
- %% 'timed'. Note this may be called more than once for each
- %% 'idle' or 'timed' returned from needs_timeout.
- {timeout, 1},
-
- %% Called immediately before the queue hibernates.
- {handle_pre_hibernate, 1},
-
- %% Exists for debugging purposes, to be able to expose state via
- %% rabbitmqctl list_queues backing_queue_status
- {status, 1},
-
- %% Passed a function to be invoked with the relevant backing
- %% queue's state. Useful for when the backing queue or other
- %% components need to pass functions into the backing queue.
- {invoke, 3},
-
- %% Called prior to a publish or publish_delivered call. Allows
- %% the BQ to signal that it's already seen this message (and in
- %% what capacity - i.e. was it published previously or discarded
- %% previously) and thus the message should be dropped.
- {is_duplicate, 2},
-
- %% Called to inform the BQ about messages which have reached the
- %% queue, but are not going to be further passed to BQ for some
- %% reason. Note that this is may be invoked for messages for
- %% which BQ:is_duplicate/2 has already returned {'published' |
- %% 'discarded', BQS}.
- {discard, 3}
- ];
+ [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
+ {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
+ {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 7b00fa5f..286b69e4 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index cc876cb4..8ad59016 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -61,6 +61,8 @@
-spec(append_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
+
-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers()))
-> rabbit_types:content()).
@@ -186,6 +188,11 @@ append_table_header(Name, Info, Headers) ->
end,
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4a0e93be..846890a1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -194,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = sets:new(),
+ queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -333,8 +333,8 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors =
- sets:del_element(QPid, State3#ch.queue_monitors)});
+ noreply(State3#ch{queue_monitors = pmon:erase(
+ QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -758,9 +758,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
fun () -> {error, not_found} end,
fun () ->
rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag,
- ok_msg(NoWait, #'basic.cancel_ok'{
- consumer_tag = ConsumerTag}))
+ Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg))
end) of
ok ->
{noreply, NewState};
@@ -937,7 +935,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
- {new, Q = #amqqueue{}} ->
+ {new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
@@ -945,7 +943,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
- CollectorPid, Q)
+ CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -1091,6 +1089,7 @@ handle_method(_MethodRecord, _Content, _State) ->
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
@@ -1103,24 +1102,19 @@ consumer_monitor(ConsumerTag,
end,
gb_sets:singleton(ConsumerTag),
QCons),
- monitor_queue(QPid, State#ch{queue_consumers = QCons1});
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1};
_ ->
State
end.
-monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case sets:is_element(QPid, QMons) of
- false -> erlang:monitor(process, QPid),
- State#ch{queue_monitors = sets:add_element(QPid, QMons)};
- true -> State
- end.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(QPid, UC),
- (case rabbit_misc:is_abnormal_termination(Reason) of
- true -> fun send_nacks/2;
- false -> fun record_confirms/2
- end)(MXs, State#ch{unconfirmed = UC1}).
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
+ end.
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
@@ -1323,7 +1317,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State1 = State#ch{queue_monitors =
+ pmon:monitor_all(DeliveredQPids,
+ State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 83e28c44..910a89b4 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -242,6 +242,11 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+%% Optimisation
+route(#exchange{name = #resource{name = <<"">>, virtual_host = VHost}},
+ #delivery{message = #basic_message{routing_keys = RKs}}) ->
+ [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+
route(X = #exchange{name = XName}, Delivery) ->
route1(Delivery, {queue:from_list([X]), XName, []}).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 44a08e24..1027570c 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -16,39 +16,57 @@
-module(rabbit_exchange_type).
--export([behaviour_info/1]).
+-ifdef(use_specs).
-behaviour_info(callbacks) ->
- [
- {description, 0},
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplist:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events() -> boolean().
- %% Should Rabbit ensure that all binding events that are
- %% delivered to an individual exchange can be serialised? (they
- %% might still be delivered out of order, but there'll be a
- %% serial number).
- {serialise_events, 0},
+%% The no_return is there so that we can have an "invalid" exchange
+%% type (see rabbit_exchange_type_invalid).
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ rabbit_router:match_result().
- {route, 2},
+%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+-callback validate(rabbit_types:exchange()) -> 'ok'.
- %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
- {validate, 1},
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
- %% called after declaration and recovery
- {create, 2},
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
- %% called after exchange (auto)deletion.
- {delete, 3},
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
- %% called after a binding has been added or recovered
- {add_binding, 3},
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
- %% called after bindings have been deleted.
- {remove_bindings, 3},
+%% called when comparing exchanges for equivalence - should return ok or
+%% exit with #amqp_error{}
+-callback assert_args_equivalence (rabbit_types:exchange(),
+ rabbit_framing:amqp_table()) ->
+ 'ok' | rabbit_types:connection_exit().
- %% called when comparing exchanges for equivalence - should return ok or
- %% exit with #amqp_error{}
- {assert_args_equivalence, 2}
+-else.
- ];
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
+ {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
+ {assert_args_equivalence, 2}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 4bce42d4..cdec1cb9 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -22,7 +22,6 @@
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
--include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type direct"},
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index cc3fb87c..a64f2c29 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -22,7 +22,6 @@
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
--include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type fanout"},
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index de9979b4..61917d8f 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -23,7 +23,6 @@
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
--include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type headers"},
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
index 8f60f7d8..82d27960 100644
--- a/src/rabbit_exchange_type_invalid.erl
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -22,7 +22,6 @@
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
--include("rabbit_exchange_type_spec.hrl").
description() ->
[{name, <<"invalid">>},
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 84f4f8a9..3160fdf4 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -23,7 +23,6 @@
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
--include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
[{description, "exchange type topic"},
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index d0b5bab7..2d155d14 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -328,7 +328,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
- monitors = dict:new(),
+ monitors = pmon:new(),
death_fun = DeathFun,
length_fun = LengthFun },
hibernate,
@@ -353,17 +353,8 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
ok = LengthFun(),
noreply(State);
-handle_cast({ensure_monitoring, Pids},
- State = #state { monitors = Monitors }) ->
- Monitors1 =
- lists:foldl(fun (Pid, MonitorsN) ->
- case dict:is_key(Pid, MonitorsN) of
- true -> MonitorsN;
- false -> MRef = erlang:monitor(process, Pid),
- dict:store(Pid, MRef, MonitorsN)
- end
- end, Monitors, Pids),
- noreply(State #state { monitors = Monitors1 }).
+handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
+ noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }).
handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
gm:broadcast(GM, heartbeat),
@@ -371,12 +362,12 @@ handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
noreply(State);
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
- State = #state { monitors = Monitors,
+ State = #state { monitors = Mons,
death_fun = DeathFun }) ->
- noreply(case dict:is_key(Pid, Monitors) of
+ noreply(case pmon:is_monitored(Pid, Mons) of
false -> State;
true -> ok = DeathFun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
+ State #state { monitors = pmon:erase(Pid, Mons) }
end);
handle_info(Msg, State) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index daeb7d85..1cb58569 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -59,10 +59,6 @@
known_senders :: set()
}).
--type(ack() :: non_neg_integer()).
--type(state() :: master_state()).
--include("rabbit_backing_queue_spec.hrl").
-
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
@@ -177,11 +173,12 @@ dropwhile(Pred, MsgFun,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
- Len = BQ:len(BQS),
+ Len = BQ:len(BQS),
BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
- Dropped = Len - BQ:len(BQS1),
+ Len1 = BQ:len(BQS1),
+ ok = gm:broadcast(GM, {set_length, Len1}),
+ Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}),
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
@@ -241,11 +238,11 @@ ack(AckTags, State = #state { gm = GM,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index eb1da1e8..a7a1273d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -140,7 +140,7 @@ init(#amqqueue { name = QueueName } = Q) ->
ack_num = 0,
msg_id_status = dict:new(),
- known_senders = dict:new(),
+ known_senders = pmon:new(),
synchronised = false
},
@@ -286,7 +286,7 @@ terminate(Reason, #state { q = Q,
rate_timer_ref = RateTRef }) ->
ok = gm:leave(GM),
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q, BQ, BQS, RateTRef, [], [], dict:new()),
+ Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()),
rabbit_amqqueue_process:terminate(Reason, QueueState);
terminate([_SPid], _Reason) ->
%% gm case
@@ -459,12 +459,8 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
-
- MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
- Pid
- end || {Pid, MRef} <- dict:to_list(KS)],
- ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
- CPid, MonitoringPids),
+ MPids = pmon:monitored(KS),
+ ok = rabbit_mirror_queue_coordinator:ensure_monitoring(CPid, MPids),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -537,7 +533,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS, MonitoringPids),
+ CPid, BQ, BQS, GM, SS, MPids),
MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
@@ -550,7 +546,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, MTC),
+ AckTags, Deliveries, KS, MTC),
{become, rabbit_amqqueue_process, QueueState, hibernate}.
noreply(State) ->
@@ -605,14 +601,10 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
State #state { rate_timer_ref = undefined }.
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
- case dict:is_key(ChPid, KS) of
- true -> State;
- false -> MRef = erlang:monitor(process, ChPid),
- State #state { known_senders = dict:store(ChPid, MRef, KS) }
- end.
+ State #state { known_senders = pmon:monitor(ChPid, KS) }.
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
- ok = case dict:is_key(ChPid, KS) of
+ ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> credit_flow:peer_down(ChPid),
confirm_sender_death(ChPid)
@@ -628,7 +620,7 @@ confirm_sender_death(Pid) ->
fun (?MODULE, State = #state { known_senders = KS,
gm = GM }) ->
%% We're running still as a slave
- ok = case dict:is_key(Pid, KS) of
+ ok = case pmon:is_monitored(Pid, KS) of
false -> ok;
true -> gm:broadcast(GM, {ensure_monitoring, [Pid]}),
confirm_sender_death(Pid)
@@ -866,21 +858,18 @@ process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- {ok, case dict:find(ChPid, KS) of
- error ->
- State;
- {ok, MRef} ->
- true = erlang:demonitor(MRef),
- MS1 = case dict:find(ChPid, SQ) of
- error ->
- MS;
- {ok, {_MQ, PendingCh}} ->
- lists:foldl(fun dict:erase/2, MS,
- sets:to_list(PendingCh))
- end,
- State #state { sender_queues = dict:erase(ChPid, SQ),
- msg_id_status = MS1,
- known_senders = dict:erase(ChPid, KS) }
+ {ok, case pmon:is_monitored(ChPid, KS) of
+ false -> State;
+ true -> MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = pmon:demonitor(ChPid, KS) }
end};
process_instruction({length, Length},
State = #state { backing_queue = BQ,
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index c1be7613..0aacd654 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,7 @@
-export([multi_call/2]).
-export([quit/1]).
-export([os_cmd/1]).
+-export([gb_sets_difference/2]).
%%----------------------------------------------------------------------------
@@ -204,6 +205,7 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
-spec(os_cmd/1 :: (string()) -> string()).
+-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-endif.
@@ -912,3 +914,6 @@ os_cmd(Command) ->
false -> throw({command_not_found, Exec});
_ -> os:cmd(Command)
end.
+
+gb_sets_difference(S1, S2) ->
+ gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 56265136..d69dad1f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1240,7 +1240,8 @@ client_confirm(CRef, MsgIds, ActionTaken, State) ->
case dict:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
ActionTaken),
- MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ MsgIds1 = rabbit_misc:gb_sets_difference(
+ Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
true -> dict:erase(CRef, CTM);
false -> dict:store(CRef, MsgIds1, CTM)
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index 9c31439f..3defeaaf 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -16,6 +16,8 @@
-module(rabbit_msg_store_ets_index).
+-include("rabbit_msg_store.hrl").
+
-behaviour(rabbit_msg_store_index).
-export([new/1, recover/1,
@@ -25,8 +27,6 @@
-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
-define(FILENAME, "msg_store_index.ets").
--include("rabbit_msg_store_index.hrl").
-
-record(state, { table, dir }).
new(Dir) ->
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index 2f36256c..6cc0b2a7 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -16,6 +16,31 @@
-module(rabbit_msg_store_index).
+-include("rabbit_msg_store.hrl").
+
+-ifdef(use_specs).
+
+-type(dir() :: any()).
+-type(index_state() :: any()).
+-type(keyvalue() :: any()).
+-type(fieldpos() :: non_neg_integer()).
+-type(fieldvalue() :: any()).
+
+-callback new(dir()) -> index_state().
+-callback recover(dir()) -> rabbit_types:ok_or_error2(index_state(), any()).
+-callback lookup(rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue()).
+-callback insert(keyvalue(), index_state()) -> 'ok'.
+-callback update(keyvalue(), index_state()) -> 'ok'.
+-callback update_fields(rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
+ [{fieldpos(), fieldvalue()}]),
+ index_state()) -> 'ok'.
+-callback delete(rabbit_types:msg_id(), index_state()) -> 'ok'.
+-callback delete_object(keyvalue(), index_state()) -> 'ok'.
+-callback delete_by_file(fieldvalue(), index_state()) -> 'ok'.
+-callback terminate(index_state()) -> any().
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
@@ -30,3 +55,5 @@ behaviour_info(callbacks) ->
{terminate, 1}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 825d1bb1..f0c75d23 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -446,16 +446,19 @@ ipv6_status(TestPort) ->
{ok, LSock4} -> gen_tcp:close(LSock4),
single_stack;
%% IPv6-only machine. Welcome to the future.
- {error, eafnosupport} -> ipv6_only;
+ {error, eafnosupport} -> ipv6_only; %% Linux
+ {error, eprotonosupport}-> ipv6_only; %% FreeBSD
%% Dual stack machine with something already
%% on IPv4.
{error, _} -> ipv6_status(TestPort + 1)
end
end;
- {error, eafnosupport} ->
- %% IPv4-only machine. Welcome to the 90s.
+ %% IPv4-only machine. Welcome to the 90s.
+ {error, eafnosupport} -> %% Linux
ipv4_only;
+ {error, eprotonosupport} -> %% FreeBSD
+ ipv4_only;
+ %% Port in use
{error, _} ->
- %% Port in use
ipv6_status(TestPort + 1)
end.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index df957d88..6dad01cc 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -23,7 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {queues, delete_from}).
+-record(state, {monitors, delete_from}).
-include("rabbit.hrl").
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -51,39 +51,37 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{queues = dict:new(), delete_from = undefined}}.
+ {ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
-handle_call({register, Q}, _From,
- State = #state{queues = Queues, delete_from = Deleting}) ->
- MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
+handle_call({register, QPid}, _From,
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
case Deleting of
undefined -> ok;
- _ -> rabbit_amqqueue:delete_immediately(Q)
+ _ -> ok = rabbit_amqqueue:delete_immediately([QPid])
end,
- {reply, ok, State#state{queues = dict:store(MonitorRef, Q, Queues)}};
+ {reply, ok, State#state{monitors = pmon:monitor(QPid, QMons)}};
-handle_call(delete_all, From, State = #state{queues = Queues,
+handle_call(delete_all, From, State = #state{monitors = QMons,
delete_from = undefined}) ->
- case dict:size(Queues) of
- 0 -> {reply, ok, State#state{delete_from = From}};
- _ -> [rabbit_amqqueue:delete_immediately(Q)
- || {_MRef, Q} <- dict:to_list(Queues)],
- {noreply, State#state{delete_from = From}}
+ case pmon:monitored(QMons) of
+ [] -> {reply, ok, State#state{delete_from = From}};
+ QPids -> ok = rabbit_amqqueue:delete_immediately(QPids),
+ {noreply, State#state{delete_from = From}}
end.
handle_cast(Msg, State) ->
{stop, {unhandled_cast, Msg}, State}.
-handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{queues = Queues, delete_from = Deleting}) ->
- Queues1 = dict:erase(MonitorRef, Queues),
- case Deleting =/= undefined andalso dict:size(Queues1) =:= 0 of
+handle_info({'DOWN', _MRef, process, DownPid, _Reason},
+ State = #state{monitors = QMons, delete_from = Deleting}) ->
+ QMons1 = pmon:erase(DownPid, QMons),
+ case Deleting =/= undefined andalso pmon:is_empty(QMons1) of
true -> gen_server:reply(Deleting, ok);
false -> ok
end,
- {noreply, State#state{queues = Queues1}}.
+ {noreply, State#state{monitors = QMons1}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e356d7ff..c74b8d5f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2306,8 +2306,8 @@ wait_for_confirms(Unconfirmed) ->
true -> ok;
false -> receive {'$gen_cast', {confirm, Confirmed, _}} ->
wait_for_confirms(
- gb_sets:difference(Unconfirmed,
- gb_sets:from_list(Confirmed)))
+ rabbit_misc:gb_sets_difference(
+ Unconfirmed, gb_sets:from_list(Confirmed)))
after 5000 -> exit(timeout_waiting_for_confirm)
end
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 46f6d6c1..f9315c5d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -351,7 +351,7 @@
durable :: boolean(),
transient_threshold :: non_neg_integer(),
- async_callback :: async_callback(),
+ async_callback :: rabbit_backing_queue:async_callback(),
len :: non_neg_integer(),
persistent_count :: non_neg_integer(),
@@ -370,8 +370,6 @@
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
--include("rabbit_backing_queue_spec.hrl").
-
-spec(multiple_routing_keys/0 :: () -> 'ok').
-endif.
@@ -591,10 +589,10 @@ dropwhile(Pred, MsgFun, State) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, MsgFun, State2);
{true, _} ->
- {{_, _, AckTag, _}, State2} =
- internal_fetch(true, MsgStatus, State1),
- {MsgStatus, State3} = read_msg(MsgStatus, State2),
- MsgFun(MsgStatus#msg_status.msg, AckTag),
+ {MsgStatus1, State2} = read_msg(MsgStatus, State1),
+ {{Msg, _, AckTag, _}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
+ MsgFun(Msg, AckTag),
dropwhile(Pred, MsgFun, State3);
{false, _} ->
a(in_r(MsgStatus, State1))
@@ -1291,10 +1289,11 @@ record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC,
confirmed = C }) ->
- State #vqstate { msgs_on_disk = gb_sets:difference(MOD, MsgIdSet),
- msg_indices_on_disk = gb_sets:difference(MIOD, MsgIdSet),
- unconfirmed = gb_sets:difference(UC, MsgIdSet),
- confirmed = gb_sets:union (C, MsgIdSet) }.
+ State #vqstate {
+ msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
+ msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
+ unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
+ confirmed = gb_sets:union(C, MsgIdSet) }.
must_sync_index(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 8dd8aba8..f1b74878 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -81,8 +81,6 @@
which_children/1, find_child/2,
check_childspecs/1]).
--export([behaviour_info/1]).
-
%% Internal exports
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
-export([handle_cast/2]).
@@ -112,11 +110,144 @@
-define(is_terminate_simple(State),
State#state.strategy =:= simple_one_for_one_terminate).
+-ifdef(use_specs).
+
+%%--------------------------------------------------------------------------
+%% Types
+%%--------------------------------------------------------------------------
+
+-export_type([child_spec/0, startchild_ret/0, strategy/0, sup_name/0]).
+
+-type child() :: 'undefined' | pid().
+-type child_id() :: term().
+-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
+-type modules() :: [module()] | 'dynamic'.
+-type delay() :: non_neg_integer().
+-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic'
+ | {'permanent', delay()} | {'transient', delay()}
+ | {'intrinsic', delay()}.
+-type shutdown() :: 'brutal_kill' | timeout().
+-type worker() :: 'worker' | 'supervisor'.
+-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
+-type sup_ref() :: (Name :: atom())
+ | {Name :: atom(), Node :: node()}
+ | {'global', Name :: atom()}
+ | pid().
+-type child_spec() :: {Id :: child_id(),
+ StartFunc :: mfargs(),
+ Restart :: restart(),
+ Shutdown :: shutdown(),
+ Type :: worker(),
+ Modules :: modules()}.
+
+
+-type strategy() :: 'one_for_all' | 'one_for_one'
+ | 'rest_for_one' | 'simple_one_for_one'
+ | 'simple_one_for_one_terminate'.
+
+-type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()],
+ name :: child_id(),
+ mfa :: mfargs(),
+ restart_type :: restart(),
+ shutdown :: shutdown(),
+ child_type :: worker(),
+ modules :: modules()}.
+
+-type state() :: #state{strategy :: strategy(),
+ children :: [child_rec()],
+ dynamics :: ?DICT(),
+ intensity :: non_neg_integer(),
+ period :: pos_integer()}.
+
+%%--------------------------------------------------------------------------
+%% Callback behaviour
+%%--------------------------------------------------------------------------
+
+-callback init(Args :: term()) ->
+ {ok, {{RestartStrategy :: strategy(),
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
+ [ChildSpec :: child_spec()]}}
+ | ignore.
+
+%%--------------------------------------------------------------------------
+%% Specs
+%%--------------------------------------------------------------------------
+
+-type startchild_err() :: 'already_present'
+ | {'already_started', Child :: child()} | term().
+-type startchild_ret() :: {'ok', Child :: child()}
+ | {'ok', Child :: child(), Info :: term()}
+ | {'error', startchild_err()}.
+
+-spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+ SupRef :: sup_ref(),
+ ChildSpec :: child_spec() | (List :: [term()]).
+
+-spec restart_child(SupRef, Id) -> Result when
+ SupRef :: sup_ref(),
+ Id :: child_id(),
+ Result :: {'ok', Child :: child()}
+ | {'ok', Child :: child(), Info :: term()}
+ | {'error', Error},
+ Error :: 'running' | 'not_found' | 'simple_one_for_one' | term().
+
+-spec delete_child(SupRef, Id) -> Result when
+ SupRef :: sup_ref(),
+ Id :: child_id(),
+ Result :: 'ok' | {'error', Error},
+ Error :: 'running' | 'not_found' | 'simple_one_for_one'.
+
+-spec terminate_child(SupRef, Id) -> Result when
+ SupRef :: sup_ref(),
+ Id :: pid() | child_id(),
+ Result :: 'ok' | {'error', Error},
+ Error :: 'not_found' | 'simple_one_for_one'.
+
+-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
+ SupRef :: sup_ref(),
+ Id :: child_id() | 'undefined',
+ Child :: child(),
+ Type :: worker(),
+ Modules :: modules().
+
+-spec check_childspecs(ChildSpecs) -> Result when
+ ChildSpecs :: [child_spec()],
+ Result :: 'ok' | {'error', Error :: term()}.
+
+-type init_sup_name() :: sup_name() | 'self'.
+
+-type stop_rsn() :: 'shutdown' | {'bad_return', {module(),'init', term()}}
+ | {'bad_start_spec', term()} | {'start_spec', term()}
+ | {'supervisor_data', term()}.
+
+-spec init({init_sup_name(), module(), [term()]}) ->
+ {'ok', state()} | 'ignore' | {'stop', stop_rsn()}.
+
+-type call() :: 'which_children'.
+-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
+
+-spec handle_cast('null', state()) -> {'noreply', state()}.
+
+-spec handle_info(term(), state()) ->
+ {'noreply', state()} | {'stop', 'shutdown', state()}.
+
+-spec terminate(term(), state()) -> 'ok'.
+
+-spec code_change(term(), state(), term()) ->
+ {'ok', state()} | {'error', term()}.
+
+-else.
+
+-export([behaviour_info/1]).
+
behaviour_info(callbacks) ->
[{init,1}];
behaviour_info(_Other) ->
undefined.
+-endif.
+
%%% ---------------------------------------------------
%%% This is a general process supervisor built upon gen_server.erl.
%%% Servers/processes should/could also be built using gen_server.erl.