summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Wragg <david@rabbitmq.com>2011-01-14 13:04:46 +0000
committerDavid Wragg <david@rabbitmq.com>2011-01-14 13:04:46 +0000
commit6d7d7eeb63d12ed41e378646dc6835f9160c8464 (patch)
tree2efc4ea43966319149a37f2860820d49264df532
parent42c352fc5069f5d3ef4a9462d51ad83ad874d7a5 (diff)
parent2450a19f4341c7cea84960d7e2ef8fd26b6eeb07 (diff)
downloadrabbitmq-server-6d7d7eeb63d12ed41e378646dc6835f9160c8464.tar.gz
Merge default into bug23568
-rw-r--r--docs/rabbitmqctl.1.xml20
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--include/rabbit_exchange_type_spec.hrl8
-rw-r--r--scripts/rabbitmq-multi.bat2
-rw-r--r--scripts/rabbitmq-server.bat2
-rw-r--r--scripts/rabbitmqctl.bat2
-rw-r--r--src/rabbit_amqqueue.erl62
-rw-r--r--src/rabbit_amqqueue_process.erl150
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_binary_generator.erl3
-rw-r--r--src/rabbit_binding.erl153
-rw-r--r--src/rabbit_channel.erl208
-rw-r--r--src/rabbit_channel_sup.erl11
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl148
-rw-r--r--src/rabbit_connection_sup.erl1
-rw-r--r--src/rabbit_event.erl6
-rw-r--r--src/rabbit_exchange.erl84
-rw-r--r--src/rabbit_exchange_type.erl8
-rw-r--r--src/rabbit_exchange_type_direct.erl12
-rw-r--r--src/rabbit_exchange_type_fanout.erl12
-rw-r--r--src/rabbit_exchange_type_headers.erl12
-rw-r--r--src/rabbit_exchange_type_topic.erl12
-rw-r--r--src/rabbit_framing_channel.erl129
-rw-r--r--src/rabbit_misc.erl41
-rw-r--r--src/rabbit_msg_store.erl280
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_queue_index.erl26
-rw-r--r--src/rabbit_reader.erl156
-rw-r--r--src/rabbit_tests.erl19
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl122
-rw-r--r--src/rabbit_vhost.erl57
33 files changed, 987 insertions, 777 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 9df4c1a8..2152cab3 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1010,6 +1010,26 @@
connection is secured with SSL.</para></listitem>
</varlistentry>
<varlistentry>
+ <term>ssl_protocol</term>
+ <listitem><para>SSL protocol
+ (e.g. tlsv1)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_key_exchange</term>
+ <listitem><para>SSL key exchange algorithm
+ (e.g. rsa)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_cipher</term>
+ <listitem><para>SSL cipher algorithm
+ (e.g. aes_256_cbc)</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>ssl_hash</term>
+ <listitem><para>SSL hash function
+ (e.g. sha)</para></listitem>
+ </varlistentry>
+ <varlistentry>
<term>peer_cert_subject</term>
<listitem><para>The subject of the peer's SSL
certificate, in RFC4514 form.</para></listitem>
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index f67c6f46..6fa34ccc 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -58,7 +58,7 @@
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(ack/2 :: ([ack()], state()) -> state()).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index ae326a87..280ffd15 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -34,14 +34,14 @@
-spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
-> rabbit_router:match_result()).
-spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/1 :: (rabbit_types:exchange()) -> 'ok').
+-spec(create/2 :: (boolean(), rabbit_types:exchange()) -> 'ok').
-spec(recover/2 :: (rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(delete/2 :: (rabbit_types:exchange(),
+-spec(delete/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
--spec(add_binding/2 :: (rabbit_types:exchange(),
+-spec(add_binding/3 :: (boolean(), rabbit_types:exchange(),
rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/2 :: (rabbit_types:exchange(),
+-spec(remove_bindings/3 :: (boolean(), rabbit_types:exchange(),
[rabbit_types:binding()]) -> 'ok').
-spec(assert_args_equivalence/2 ::
(rabbit_types:exchange(), rabbit_framing:amqp_table())
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4f8c8b4..ec61dc99 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -89,7 +89,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
-pa "!TDP0!..\ebin" ^
-noinput -hidden ^
!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi ^
+-sname rabbitmq_multi!RANDOM! ^
!RABBITMQ_CONFIG_ARG! ^
-s rabbit_multi ^
!RABBITMQ_MULTI_START_ARGS! ^
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 52a250c6..ec5b4d85 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -118,7 +118,7 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
-s rabbit_prelaunch ^
--sname rabbitmqprelaunch%RANDOM% ^
+-sname rabbitmqprelaunch!RANDOM! ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
"!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
"!RABBITMQ_NODENAME!"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 563b9e58..4ffde73f 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -58,7 +58,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
exit /B
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden !RABBITMQ_CTL_ERL_ARGS! -sname rabbitmqctl!RANDOM! -s rabbit_control -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 35ed1c94..20097a7d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -213,24 +213,28 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
end.
internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
- rabbit_misc:execute_mnesia_transaction(
+ rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case Recover of
true ->
ok = store_queue(Q),
- Q;
+ rabbit_misc:const(Q);
false ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] ->
case mnesia:read({rabbit_durable_queue,
QueueName}) of
[] -> ok = store_queue(Q),
- ok = add_default_binding(Q),
- Q;
- [_] -> not_found %% Q exists on stopped node
+ B = add_default_binding(Q),
+ fun (Tx) ->
+ B(Tx),
+ Q
+ end;
+ [_] -> %% Q exists on stopped node
+ rabbit_misc:const(not_found)
end;
[ExistingQ] ->
- ExistingQ
+ rabbit_misc:const(ExistingQ)
end
end
end).
@@ -447,16 +451,18 @@ internal_delete1(QueueName) ->
rabbit_binding:remove_for_destination(QueueName).
internal_delete(QueueName) ->
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_queue, QueueName}) of
- [] -> {error, not_found};
- [_] -> internal_delete1(QueueName)
- end
- end) of
- {error, _} = Err -> Err;
- Deletions -> ok = rabbit_binding:process_deletions(Deletions)
- end.
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_queue, QueueName}) of
+ [] -> {error, not_found};
+ [_] -> internal_delete1(QueueName)
+ end
+ end,
+ fun ({error, _} = Err, _Tx) ->
+ Err;
+ (Deletions, Tx) ->
+ ok = rabbit_binding:process_deletions(Deletions, Tx)
+ end).
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
@@ -480,16 +486,20 @@ drop_expired(QPid) ->
gen_server2:cast(QPid, drop_expired).
on_node_down(Node) ->
- rabbit_binding:process_deletions(
- lists:foldl(
- fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node]))
- end))).
+ rabbit_misc:execute_mnesia_transaction(
+ fun () -> qlc:e(qlc:q([delete_queue(QueueName) ||
+ #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node]))
+ end,
+ fun (Deletions, Tx) ->
+ rabbit_binding:process_deletions(
+ lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(),
+ Deletions),
+ Tx)
+ end).
delete_queue(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 981dd31d..38b83117 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -374,12 +374,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- {State2, ChAckTags1} =
+ ChAckTags1 =
case AckRequired of
- true -> {State1,
- sets:add_element(AckTag, ChAckTags)};
- false -> {confirm_message(Message, State1),
- ChAckTags}
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -396,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State3 = State2#q{
+ State2 = State1#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State3);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -427,22 +425,36 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-confirm_messages(Guids, State) ->
- lists:foldl(fun confirm_message_by_guid/2, State, Guids).
-
-confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
- case dict:find(Guid, GTC) of
- {ok, {_ , undefined}} -> ok;
- {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
+ {CMs, GTC1} =
+ lists:foldl(
+ fun(Guid, {CMs, GTC0}) ->
+ case dict:find(Guid, GTC0) of
+ {ok, {ChPid, MsgSeqNo}} ->
+ {[{ChPid, MsgSeqNo} | CMs], dict:erase(Guid, GTC0)};
+ _ ->
+ {CMs, GTC0}
+ end
+ end, {[], GTC}, Guids),
+ case lists:usort(CMs) of
+ [{Ch, MsgSeqNo} | CMs1] ->
+ [rabbit_channel:confirm(ChPid, MsgSeqNos) ||
+ {ChPid, MsgSeqNos} <- group_confirms_by_channel(
+ CMs1, [{Ch, [MsgSeqNo]}])];
+ [] ->
+ ok
end,
- State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+ State#q{guid_to_channel = GTC1}.
-confirm_message(#basic_message{guid = Guid}, State) ->
- confirm_message_by_guid(Guid, State).
+group_confirms_by_channel([], Acc) ->
+ Acc;
+group_confirms_by_channel([{Ch, Msg1} | CMs], [{Ch, Msgs} | Acc]) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1 | Msgs]} | Acc]);
+group_confirms_by_channel([{Ch, Msg1} | CMs], Acc) ->
+ group_confirms_by_channel(CMs, [{Ch, [Msg1]} | Acc]).
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- State;
+ {no_confirm, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
@@ -451,14 +463,10 @@ record_confirm_message(#delivery{sender = ChPid,
State =
#q{guid_to_channel = GTC,
q = #amqqueue{durable = true}}) ->
- State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+ {confirm,
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}};
record_confirm_message(_Delivery, State) ->
- State.
-
-ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
- confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
+ {no_confirm, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -473,12 +481,12 @@ attempt_delivery(#delivery{txn = none,
sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
- State = #q{backing_queue = BQ, q = Q}) ->
- NeedsConfirming = Message#basic_message.is_persistent andalso
- Q#amqqueue.durable,
- case NeedsConfirming of
- false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
- _ -> ok
+ {NeedsConfirming, State = #q{backing_queue = BQ}}) ->
+ %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
+ case {NeedsConfirming, MsgSeqNo} of
+ {_, undefined} -> ok;
+ {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ {confirm, _} -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -490,31 +498,37 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = NeedsConfirming},
+ needs_confirming = (NeedsConfirming =:= confirm)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
+ {Delivered, State1} =
+ deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State),
+ {Delivered, NeedsConfirming, State1};
attempt_delivery(#delivery{txn = Txn,
sender = ChPid,
message = Message},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ {NeedsConfirming,
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}}) ->
record_current_channel_tx(ChPid, Txn),
{true,
+ NeedsConfirming,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
- {true, State1} ->
+ {true, _, State1} ->
{true, State1};
- {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ {false, NeedsConfirming, State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}} ->
+ #delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
- needs_confirming = (MsgSeqNo =/= undefined)},
+ needs_confirming =
+ (NeedsConfirming =:= confirm)},
BQS),
{false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
@@ -771,18 +785,19 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {maybe_run_queue_via_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -823,7 +838,7 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+handle_call({deliver_immediately, Delivery},
_From, State) ->
%% Synchronous, "immediate" delivery mode
%%
@@ -838,17 +853,15 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, State1} =
+ {Delivered, _NeedsConfirming, State1} =
attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
- reply(Delivered, case Delivered of
- true -> State1;
- false -> confirm_message(Message, State1)
- end);
+ reply(Delivered, State1);
-handle_call({deliver, Delivery}, _From, State) ->
- %% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- reply(Delivered, NewState);
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" delivery mode. Reply asap.
+ gen_server2:reply(From, true),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
+ noreply(NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
NewState = commit_transaction(Txn, From, ChPid, State),
@@ -881,7 +894,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
sets:add_element(AckTag,
ChAckTags)}),
State2;
- false -> confirm_message(Message, State2)
+ false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
reply({ok, Remaining, Msg}, State3)
@@ -1019,8 +1032,8 @@ handle_cast({ack, Txn, AckTags, ChPid},
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
NewC = C#cr{acktags = ChAckTags1},
- NewState = ack_by_acktags(AckTags, State),
- {NewC, NewState};
+ BQS1 = BQ:ack(AckTags, BQS),
+ {NewC, State#q{backing_queue_state = BQS1}};
_ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
{C#cr{txn = Txn},
State#q{backing_queue_state = BQS1}}
@@ -1029,7 +1042,9 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -1038,7 +1053,8 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> ack_by_acktags(AckTags, State)
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1}
end)
end;
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 79910b95..233e2b90 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -97,7 +97,7 @@ description() ->
{description, <<"Internal user / password database">>}].
check_user_login(Username, []) ->
- internal_check_user_login(Username, fun() -> true end);
+ internal_check_user_login(Username, fun(_) -> true end);
check_user_login(Username, [{password, Password}]) ->
internal_check_user_login(
Username,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index a5297a70..e81066da 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -344,8 +344,7 @@ lookup_amqp_exception(#amqp_error{name = Name,
{ShouldClose, Code, ExplBin, Method};
lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- Protocol:lookup_amqp_exception(internal_error, Protocol),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index ccadf5af..74fd00b7 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -36,7 +36,7 @@
-export([list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
- process_deletions/1]).
+ process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2]).
%% these must all be run inside a mnesia tx
-export([has_for_source/1, remove_for_source/1,
@@ -91,7 +91,7 @@
(rabbit_types:binding_destination()) -> deletions()).
-spec(remove_transient_for_destination/1 ::
(rabbit_types:binding_destination()) -> deletions()).
--spec(process_deletions/1 :: (deletions()) -> 'ok').
+-spec(process_deletions/2 :: (deletions(), boolean()) -> 'ok').
-spec(combine_deletions/2 :: (deletions(), deletions()) -> deletions()).
-spec(add_deletion/3 :: (rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
@@ -118,69 +118,66 @@ recover() ->
exists(Binding) ->
binding_action(
- Binding,
- fun (_Src, _Dst, B) -> mnesia:read({rabbit_route, B}) /= [] end).
+ Binding, fun (_Src, _Dst, B) ->
+ rabbit_misc:const(mnesia:read({rabbit_route, B}) /= [])
+ end).
add(Binding) -> add(Binding, fun (_Src, _Dst) -> ok end).
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
add(Binding, InnerFun) ->
- case binding_action(
- Binding,
- fun (Src, Dst, B) ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- case InnerFun(Src, Dst) of
- ok ->
- case mnesia:read({rabbit_route, B}) of
- [] -> ok = sync_binding(
- B, all_durable([Src, Dst]),
- fun mnesia:write/3),
- {new, Src, B};
- [_] -> {existing, Src, B}
- end;
- {error, _} = E ->
- E
- end
- end) of
- {new, Src = #exchange{ type = Type }, B} ->
- ok = (type_to_module(Type)):add_binding(Src, B),
- rabbit_event:notify(binding_created, info(B));
- {existing, _, _} ->
- ok;
- {error, _} = Err ->
- Err
- end.
+ binding_action(
+ Binding,
+ fun (Src, Dst, B) ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ case InnerFun(Src, Dst) of
+ ok ->
+ case mnesia:read({rabbit_route, B}) of
+ [] -> ok = sync_binding(B, all_durable([Src, Dst]),
+ fun mnesia:write/3),
+ fun (Tx) ->
+ ok = rabbit_exchange:callback(
+ Src, add_binding, [Tx, Src, B]),
+ rabbit_event:notify_if(
+ not Tx, binding_created, info(B))
+ end;
+ [_] -> fun rabbit_misc:const_ok/1
+ end;
+ {error, _} = Err ->
+ rabbit_misc:const(Err)
+ end
+ end).
remove(Binding, InnerFun) ->
- case binding_action(
- Binding,
- fun (Src, Dst, B) ->
- case mnesia:match_object(rabbit_route, #route{binding = B},
- write) of
- [] ->
- {error, binding_not_found};
- [_] ->
- case InnerFun(Src, Dst) of
- ok ->
- ok = sync_binding(
- B, all_durable([Src, Dst]),
- fun mnesia:delete_object/3),
- {ok,
- maybe_auto_delete(B#binding.source,
- [B], new_deletions())};
- {error, _} = E ->
- E
- end
- end
- end) of
- {error, _} = Err ->
- Err;
- {ok, Deletions} ->
- ok = process_deletions(Deletions)
- end.
+ binding_action(
+ Binding,
+ fun (Src, Dst, B) ->
+ Result =
+ case mnesia:match_object(rabbit_route, #route{binding = B},
+ write) of
+ [] ->
+ {error, binding_not_found};
+ [_] ->
+ case InnerFun(Src, Dst) of
+ ok ->
+ ok = sync_binding(B, all_durable([Src, Dst]),
+ fun mnesia:delete_object/3),
+ {ok, maybe_auto_delete(B#binding.source,
+ [B], new_deletions())};
+ {error, _} = E ->
+ E
+ end
+ end,
+ case Result of
+ {error, _} = Err ->
+ rabbit_misc:const(Err);
+ {ok, Deletions} ->
+ fun (Tx) -> ok = process_deletions(Deletions, Tx) end
+ end
+ end).
list(VHostPath) ->
VHostResource = rabbit_misc:r(VHostPath, '_'),
@@ -290,24 +287,22 @@ sync_binding(Binding, Durable, Fun) ->
call_with_source_and_destination(SrcName, DstName, Fun) ->
SrcTable = table_for_resource(SrcName),
DstTable = table_for_resource(DstName),
- rabbit_misc:execute_mnesia_transaction(
- fun () -> case {mnesia:read({SrcTable, SrcName}),
- mnesia:read({DstTable, DstName})} of
- {[Src], [Dst]} -> Fun(Src, Dst);
- {[], [_] } -> {error, source_not_found};
- {[_], [] } -> {error, destination_not_found};
- {[], [] } -> {error, source_and_destination_not_found}
- end
+ ErrFun = fun (Err) -> rabbit_misc:const(Err) end,
+ rabbit_misc:execute_mnesia_tx_with_tail(
+ fun () ->
+ case {mnesia:read({SrcTable, SrcName}),
+ mnesia:read({DstTable, DstName})} of
+ {[Src], [Dst]} -> Fun(Src, Dst);
+ {[], [_] } -> ErrFun({error, source_not_found});
+ {[_], [] } -> ErrFun({error, destination_not_found});
+ {[], [] } -> ErrFun({error,
+ source_and_destination_not_found})
+ end
end).
table_for_resource(#resource{kind = exchange}) -> rabbit_exchange;
table_for_resource(#resource{kind = queue}) -> rabbit_queue.
-%% Used with atoms from records; e.g., the type is expected to exist.
-type_to_module(T) ->
- {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- Module.
-
contains(Table, MatchHead) ->
continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)).
@@ -423,17 +418,19 @@ merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
anything_but(not_deleted, Deleted1, Deleted2),
[Bindings1 | Bindings2]}.
-process_deletions(Deletions) ->
+process_deletions(Deletions, Tx) ->
dict:fold(
- fun (_XName, {X = #exchange{ type = Type }, Deleted, Bindings}, ok) ->
+ fun (_XName, {X, Deleted, Bindings}, ok) ->
FlatBindings = lists:flatten(Bindings),
- [rabbit_event:notify(binding_deleted, info(B)) ||
+ [rabbit_event:notify_if(not Tx, binding_deleted, info(B)) ||
B <- FlatBindings],
- TypeModule = type_to_module(Type),
case Deleted of
- not_deleted -> TypeModule:remove_bindings(X, FlatBindings);
- deleted -> rabbit_event:notify(exchange_deleted,
- [{name, X#exchange.name}]),
- TypeModule:delete(X, FlatBindings)
+ not_deleted ->
+ rabbit_exchange:callback(X, remove_bindings,
+ [Tx, X, FlatBindings]);
+ deleted ->
+ rabbit_event:notify_if(not Tx, exchange_deleted,
+ [{name, X#exchange.name}]),
+ rabbit_exchange:callback(X, delete, [Tx, X, FlatBindings])
end
end, ok, Deletions).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2067e306..7b5f096b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1]).
@@ -49,8 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
- held_confirms, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -72,8 +71,6 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(FLUSH_CONFIRMS_INTERVAL, 1000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -97,8 +94,7 @@
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
--spec(flush_confirms/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -137,11 +133,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNo) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
-
-flush_confirms(Pid) ->
- gen_server2:cast(Pid, flush_confirms).
+confirm(Pid, MsgSeqNos) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
pg_local:get_members(rabbit_channels).
@@ -192,9 +185,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
- publish_seqno = 0,
- confirm_multiple = false,
- held_confirms = gb_sets:new(),
+ publish_seqno = 1,
unconfirmed = gb_sets:new(),
queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -292,11 +283,8 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
hibernate};
-handle_cast(flush_confirms, State) ->
- {noreply, internal_flush_confirms(State)};
-
-handle_cast({confirm, MsgSeqNo, From}, State) ->
- {noreply, confirm(MsgSeqNo, From, State)}.
+handle_cast({confirm, MsgSeqNos, From}, State) ->
+ {noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
State = #ch{queues_for_msg = QFM}) ->
@@ -304,7 +292,7 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
Qs = sets:del_element(QPid, QPids),
case sets:size(Qs) of
- 0 -> confirm(Msg, QPid, State0);
+ 0 -> confirm([Msg], QPid, State0);
_ -> State0#ch{queues_for_msg =
dict:store(Msg, Qs, QFM0)}
end
@@ -312,16 +300,15 @@ handle_info({'DOWN', _MRef, process, QPid, _Reason},
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
- {hibernate, State1#ch{stats_timer = StatsTimer1}}.
+ {hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -484,51 +471,30 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-confirm(undefined, _QPid, State) ->
+confirm([], _QPid, State) ->
State;
-confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
- State;
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{
- delivery_tag = MSN}),
- State1
- end, State);
-confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
- do_if_unconfirmed(MsgSeqNo, QPid,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_confirm_timer(
- State1#ch{held_confirms = gb_sets:add(MSN, As)})
- end, State).
-
-do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
- State = #ch{unconfirmed = UC,
- queues_for_msg = QFM}) ->
- %% clears references to MsgSeqNo and does ConfirmFun
- case gb_sets:is_element(MsgSeqNo, UC) of
- true ->
- Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
- case QPid of
- undefined ->
- ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
- _ ->
- {ok, Qs} = dict:find(MsgSeqNo, QFM),
- Qs1 = sets:del_element(QPid, Qs),
- case sets:size(Qs1) of
- 0 -> ConfirmFun(MsgSeqNo,
- State#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM),
- unconfirmed = Unconfirmed1});
- _ -> State#ch{queues_for_msg =
- dict:store(MsgSeqNo, Qs1, QFM)}
- end
- end;
- false ->
- State
- end.
+confirm(MsgSeqNos, QPid, State) ->
+ {DoneMessages, State1} =
+ lists:foldl(
+ fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
+ queues_for_msg = QFM0}}) ->
+ case gb_sets:is_element(MsgSeqNo, UC0) of
+ false -> {DMs, State0};
+ true -> Qs1 = sets:del_element(
+ QPid, dict:fetch(MsgSeqNo, QFM0)),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | DMs],
+ State0#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM0),
+ unconfirmed =
+ gb_sets:delete(MsgSeqNo, UC0)}};
+ _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
+ {DMs, State0#ch{queues_for_msg = QFM1}}
+ end
+ end
+ end, {[], State}, MsgSeqNos),
+ send_confirms(DoneMessages, State1).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -564,15 +530,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
IsPersistent = is_message_persistent(DecodedContent),
- {MsgSeqNo, State1}
- = case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo,
- State#ch{publish_seqno = SeqNo + 1,
- unconfirmed =
- gb_sets:add(SeqNo, State#ch.unconfirmed)}}
- end,
+ {MsgSeqNo, State1} =
+ case ConfirmEnabled of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -1010,20 +973,10 @@ handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = false}) ->
- return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
+ return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
- _, State = #ch{confirm_enabled = true,
- confirm_multiple = Multiple}) ->
- return_ok(State, NoWait, #'confirm.select_ok'{});
-
-handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot change confirm_multiple setting", []);
-
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1253,66 +1206,53 @@ is_message_persistent(Content) ->
process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
- confirm(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
- confirm(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, [], MsgSeqNo, _, State) ->
- confirm(MsgSeqNo, undefined, State);
+ send_confirms([MsgSeqNo], State);
process_routing_result(routed, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, MsgSeqNo, _,
- State = #ch{queues_for_msg = QFM}) ->
- QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
+ #ch{queues_for_msg = QFM, unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = QFM1}.
+ State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+ unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
lock_message(false, _MsgStruct, State) ->
State.
-start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
- {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
- ?MODULE, flush_confirms, [self()]),
- State#ch{confirm_tref = TRef};
-start_confirm_timer(State) ->
- State.
-
-stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+send_confirms([], State) ->
State;
-stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
- {ok, cancel} = timer:cancel(TRef),
- State#ch{confirm_tref = undefined}.
-
-internal_flush_confirms(State = #ch{writer_pid = WriterPid,
- held_confirms = Cs}) ->
- case gb_sets:is_empty(Cs) of
- true -> State#ch{confirm_tref = undefined};
- false -> [First | Rest] = gb_sets:to_list(Cs),
- {Mult, Inds} = find_consecutive_sequence(First, Rest),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = Mult, multiple = true}),
- ok = lists:foldl(
- fun(T, ok) -> rabbit_writer:send_command(
- WriterPid,
- #'basic.ack'{delivery_tag = T})
- end, ok, Inds),
- State#ch{held_confirms = gb_sets:new(),
- confirm_tref = undefined}
- end.
+send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
+ send_confirm(MsgSeqNo, WriterPid),
+ State;
+send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SCs = lists:usort(Cs),
+ CutOff = case gb_sets:is_empty(UC) of
+ true -> lists:last(SCs) + 1;
+ false -> gb_sets:smallest(UC)
+ end,
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ case Ms of
+ [] -> ok;
+ _ -> ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
+ multiple = true})
+ end,
+ [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ State.
-%% Find longest sequence of consecutive numbers at the beginning.
-find_consecutive_sequence(Last, []) ->
- {Last, []};
-find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
- find_consecutive_sequence(N, Ns);
-find_consecutive_sequence(Last, Ns) ->
- {Last, Ns}.
+send_confirm(undefined, _WriterPid) ->
+ ok;
+send_confirm(SeqNo, WriterPid) ->
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = SeqNo}).
-terminate(State) ->
- stop_confirm_timer(State),
+terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a36253a0..9f50176d 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -50,7 +50,7 @@
rabbit_channel:channel_number(), non_neg_integer(), pid(),
rabbit_types:user(), rabbit_types:vhost(), pid()}).
--spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), pid()}).
+-spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}).
-endif.
@@ -72,13 +72,8 @@ start_link({Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost,
[Channel, ReaderPid, WriterPid, User, VHost,
Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
- {ok, FramingChannelPid} =
- supervisor2:start_child(
- SupPid,
- {framing_channel, {rabbit_framing_channel, start_link,
- [ReaderPid, ChannelPid, Protocol]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
- {ok, SupPid, FramingChannelPid}.
+ {ok, AState} = rabbit_command_assembler:init(Protocol),
+ {ok, SupPid, {ChannelPid, AState}}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 21c39780..fd99af56 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -43,7 +43,7 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel/2 :: (pid(), rabbit_channel_sup:start_link_args()) ->
- {'ok', pid(), pid()}).
+ {'ok', pid(), {pid(), any()}}).
-endif.
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
new file mode 100644
index 00000000..f8d3260e
--- /dev/null
+++ b/src/rabbit_command_assembler.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_command_assembler).
+-include("rabbit_framing.hrl").
+-include("rabbit.hrl").
+
+-export([analyze_frame/3, init/1, process/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(frame_type() :: ?FRAME_METHOD | ?FRAME_HEADER | ?FRAME_BODY |
+ ?FRAME_OOB_METHOD | ?FRAME_OOB_HEADER | ?FRAME_OOB_BODY |
+ ?FRAME_TRACE | ?FRAME_HEARTBEAT).
+-type(protocol() :: rabbit_framing:protocol()).
+-type(method() :: rabbit_framing:amqp_method_record()).
+-type(class_id() :: rabbit_framing:amqp_class_id()).
+-type(weight() :: non_neg_integer()).
+-type(body_size() :: non_neg_integer()).
+-type(content() :: rabbit_types:undecoded_content()).
+
+-type(frame() ::
+ {'method', rabbit_framing:amqp_method_name(), binary()} |
+ {'content_header', class_id(), weight(), body_size(), binary()} |
+ {'content_body', binary()}).
+
+-type(state() ::
+ {'method', protocol()} |
+ {'content_header', method(), class_id(), protocol()} |
+ {'content_body', method(), body_size(), class_id(), protocol()}).
+
+-spec(analyze_frame/3 :: (frame_type(), binary(), protocol()) ->
+ frame() | 'heartbeat' | 'error').
+
+-spec(init/1 :: (protocol()) -> {ok, state()}).
+-spec(process/2 :: (frame(), state()) ->
+ {ok, state()} |
+ {ok, method(), state()} |
+ {ok, method(), content(), state()} |
+ {error, rabbit_types:amqp_error()}).
+
+-endif.
+
+%%--------------------------------------------------------------------
+
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
+ {content_header, ClassId, Weight, BodySize, Properties};
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
+ {content_body, Body};
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
+ heartbeat;
+analyze_frame(_Type, _Body, _Protocol) ->
+ error.
+
+init(Protocol) -> {ok, {method, Protocol}}.
+
+process({method, MethodName, FieldsBin}, {method, Protocol}) ->
+ try
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ {ok, {content_header, Method, ClassId, Protocol}};
+ false -> {ok, Method, {method, Protocol}}
+ end
+ catch exit:#amqp_error{} = Reason -> {error, Reason}
+ end;
+process(_Frame, {method, _Protocol}) ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead", [], none);
+process({content_header, ClassId, 0, 0, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, Method, Content, {method, Protocol}};
+process({content_header, ClassId, 0, BodySize, PropertiesBin},
+ {content_header, Method, ClassId, Protocol}) ->
+ Content = empty_content(ClassId, PropertiesBin, Protocol),
+ {ok, {content_body, Method, BodySize, Content, Protocol}};
+process({content_header, HeaderClassId, 0, _BodySize, _PropertiesBin},
+ {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId], Method);
+process(_Frame, {content_header, Method, ClassId, _Protocol}) ->
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead", [ClassId], Method);
+process({content_body, FragmentBin},
+ {content_body, Method, RemainingSize,
+ Content = #content{payload_fragments_rev = Fragments}, Protocol}) ->
+ NewContent = Content#content{
+ payload_fragments_rev = [FragmentBin | Fragments]},
+ case RemainingSize - size(FragmentBin) of
+ 0 -> {ok, Method, NewContent, {method, Protocol}};
+ Sz -> {ok, {content_body, Method, Sz, NewContent, Protocol}}
+ end;
+process(_Frame, {content_body, Method, _RemainingSize, _Content, _Protocol}) ->
+ unexpected_frame("expected content body, "
+ "got non content body frame instead", [], Method).
+
+%%--------------------------------------------------------------------
+
+empty_content(ClassId, PropertiesBin, Protocol) ->
+ #content{class_id = ClassId,
+ properties = none,
+ properties_bin = PropertiesBin,
+ protocol = Protocol,
+ payload_fragments_rev = []}.
+
+unexpected_frame(Format, Params, Method) when is_atom(Method) ->
+ {error, rabbit_misc:amqp_error(unexpected_frame, Format, Params, Method)};
+unexpected_frame(Format, Params, Method) ->
+ unexpected_frame(Format, Params, rabbit_misc:method_record_type(Method)).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index ff3995b5..a6b1f7fa 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -78,4 +78,3 @@ reader(Pid) ->
init([]) ->
{ok, {{one_for_all, 0, 1}, []}}.
-
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 2b236531..9755654b 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -37,7 +37,7 @@
-export([init_stats_timer/0, ensure_stats_timer/2, stop_stats_timer/1]).
-export([reset_stats_timer/1]).
-export([stats_level/1, if_enabled/2]).
--export([notify/2]).
+-export([notify/2, notify_if/3]).
%%----------------------------------------------------------------------------
@@ -77,6 +77,7 @@
-spec(stats_level/1 :: (state()) -> level()).
-spec(if_enabled/2 :: (state(), timer_fun()) -> 'ok').
-spec(notify/2 :: (event_type(), event_props()) -> 'ok').
+-spec(notify_if/3 :: (boolean(), event_type(), event_props()) -> 'ok').
-endif.
@@ -140,6 +141,9 @@ if_enabled(_State, Fun) ->
Fun(),
ok.
+notify_if(true, Type, Props) -> notify(Type, Props);
+notify_if(false, _Type, _Props) -> ok.
+
notify(Type, Props) ->
try
%% TODO: switch to os:timestamp() when we drop support for
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a95cf0b1..83c26e68 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,6 +35,7 @@
-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+-export([callback/3]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
@@ -86,6 +87,7 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
+-spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok').
-endif.
@@ -121,34 +123,32 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) ->
auto_delete = AutoDelete,
internal = Internal,
arguments = Args},
- %% We want to upset things if it isn't ok; this is different from
- %% the other hooks invocations, where we tend to ignore the return
- %% value.
- TypeModule = type_to_module(Type),
- ok = TypeModule:validate(X),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:wread({rabbit_exchange, XName}) of
- [] ->
- ok = mnesia:write(rabbit_exchange, X, write),
- ok = case Durable of
- true ->
- mnesia:write(rabbit_durable_exchange,
+ %% We want to upset things if it isn't ok
+ ok = (type_to_module(Type)):validate(X),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:wread({rabbit_exchange, XName}) of
+ [] ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ ok = case Durable of
+ true -> mnesia:write(rabbit_durable_exchange,
X, write);
- false ->
- ok
+ false -> ok
end,
- {new, X};
- [ExistingX] ->
- {existing, ExistingX}
- end
- end) of
- {new, X} -> TypeModule:create(X),
- rabbit_event:notify(exchange_created, info(X)),
- X;
- {existing, X} -> X;
- Err -> Err
- end.
+ {new, X};
+ [ExistingX] ->
+ {existing, ExistingX}
+ end
+ end,
+ fun ({new, Exchange}, Tx) ->
+ callback(Exchange, create, [Tx, Exchange]),
+ rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)),
+ Exchange;
+ ({existing, Exchange}, _Tx) ->
+ Exchange;
+ (Err, _Tx) ->
+ Err
+ end).
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
@@ -278,27 +278,28 @@ process_route(#resource{kind = queue} = QName,
{WorkList, SeenXs, QNames}) ->
{WorkList, SeenXs, [QName | QNames]}.
-call_with_exchange(XName, Fun) ->
+call_with_exchange(XName, Fun, PrePostCommitFun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
- end).
+ end, PrePostCommitFun).
delete(XName, IfUnused) ->
- Fun = case IfUnused of
- true -> fun conditional_delete/1;
- false -> fun unconditional_delete/1
- end,
- case call_with_exchange(XName, Fun) of
- {deleted, X, Bs, Deletions} ->
- ok = rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions));
- Error = {error, _InUseOrNotFound} ->
- Error
- end.
+ call_with_exchange(
+ XName,
+ case IfUnused of
+ true -> fun conditional_delete/1;
+ false -> fun unconditional_delete/1
+ end,
+ fun ({deleted, X, Bs, Deletions}, Tx) ->
+ ok = rabbit_binding:process_deletions(
+ rabbit_binding:add_deletion(
+ XName, {X, deleted, Bs}, Deletions), Tx);
+ (Error = {error, _InUseOrNotFound}, _Tx) ->
+ Error
+ end).
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
@@ -308,6 +309,9 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) ->
{deleted, X, [], Deletions} -> {deleted, Deletions}
end.
+callback(#exchange{type = XType}, Fun, Args) ->
+ apply(type_to_module(XType), Fun, Args).
+
conditional_delete(X = #exchange{name = XName}) ->
case rabbit_binding:has_for_source(XName) of
false -> unconditional_delete(X);
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 742944dc..8b90cbc4 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -42,19 +42,19 @@ behaviour_info(callbacks) ->
{validate, 1},
%% called after declaration when previously absent
- {create, 1},
+ {create, 2},
%% called when recovering
{recover, 2},
%% called after exchange deletion.
- {delete, 2},
+ {delete, 3},
%% called after a binding has been added
- {add_binding, 2},
+ {add_binding, 3},
%% called after bindings have been deleted.
- {remove_bindings, 2},
+ {remove_bindings, 3},
%% called when comparing exchanges for equivalence - should return ok or
%% exit with #amqp_error{}
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index d49d0199..adb47cc0 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -55,10 +55,10 @@ route(#exchange{name = Name},
rabbit_router:match_routing_key(Name, RoutingKey).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index e7f75464..5266dd87 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -54,10 +54,10 @@ route(#exchange{name = Name}, _Delivery) ->
rabbit_router:match_routing_key(Name, '_').
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index caf141fe..efe0ec88 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -36,8 +36,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -128,10 +128,10 @@ headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 44851858..2f0d47a7 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -35,8 +35,8 @@
-behaviour(rabbit_exchange_type).
-export([description/0, route/2]).
--export([validate/1, create/1, recover/2, delete/2, add_binding/2,
- remove_bindings/2, assert_args_equivalence/2]).
+-export([validate/1, create/2, recover/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -94,10 +94,10 @@ last_topic_match(P, R, [BacktrackNext | BacktrackList]) ->
last_topic_match(P, [BacktrackNext | R], BacktrackList).
validate(_X) -> ok.
-create(_X) -> ok.
+create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.
-delete(_X, _Bs) -> ok.
-add_binding(_X, _B) -> ok.
-remove_bindings(_X, _Bs) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
deleted file mode 100644
index cb53185f..00000000
--- a/src/rabbit_framing_channel.erl
+++ /dev/null
@@ -1,129 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_framing_channel).
--include("rabbit.hrl").
-
--export([start_link/3, process/2, shutdown/1]).
-
-%% internal
--export([mainloop/3]).
-
-%%--------------------------------------------------------------------
-
-start_link(Parent, ChannelPid, Protocol) ->
- {ok, proc_lib:spawn_link(
- fun () -> mainloop(Parent, ChannelPid, Protocol) end)}.
-
-process(Pid, Frame) ->
- Pid ! {frame, Frame},
- ok.
-
-shutdown(Pid) ->
- Pid ! terminate,
- ok.
-
-%%--------------------------------------------------------------------
-
-read_frame(ChannelPid) ->
- receive
- {frame, Frame} -> Frame;
- terminate -> rabbit_channel:shutdown(ChannelPid),
- read_frame(ChannelPid);
- Msg -> exit({unexpected_message, Msg})
- end.
-
-mainloop(Parent, ChannelPid, Protocol) ->
- case read_frame(ChannelPid) of
- {method, MethodName, FieldsBin} ->
- Method = Protocol:decode_method_fields(MethodName, FieldsBin),
- case Protocol:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
- case collect_content(ChannelPid, ClassId, Protocol) of
- {ok, Content} ->
- rabbit_channel:do(ChannelPid, Method, Content),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol);
- {error, Reason} ->
- channel_exit(Parent, Reason, MethodName)
- end;
- false -> rabbit_channel:do(ChannelPid, Method),
- ?MODULE:mainloop(Parent, ChannelPid, Protocol)
- end;
- _ ->
- channel_exit(Parent, {unexpected_frame,
- "expected method frame, "
- "got non method frame instead",
- []}, none)
- end.
-
-collect_content(ChannelPid, ClassId, Protocol) ->
- case read_frame(ChannelPid) of
- {content_header, ClassId, 0, BodySize, PropertiesBin} ->
- case collect_content_payload(ChannelPid, BodySize, []) of
- {ok, Payload} -> {ok, #content{
- class_id = ClassId,
- properties = none,
- properties_bin = PropertiesBin,
- protocol = Protocol,
- payload_fragments_rev = Payload}};
- Error -> Error
- end;
- {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]}};
- _ ->
- {error, {unexpected_frame,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId]}}
- end.
-
-collect_content_payload(_ChannelPid, 0, Acc) ->
- {ok, Acc};
-collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
- case read_frame(ChannelPid) of
- {content_body, FragmentBin} ->
- collect_content_payload(ChannelPid,
- RemainingByteCount - size(FragmentBin),
- [FragmentBin | Acc]);
- _ ->
- {error, {unexpected_frame,
- "expected content body, "
- "got non content body frame instead",
- []}}
- end.
-
-channel_exit(Parent, {ErrorName, ExplanationFormat, Params}, MethodName) ->
- Reason = rabbit_misc:amqp_error(ErrorName, ExplanationFormat, Params,
- MethodName),
- Parent ! {channel_exit, self(), Reason}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 15ba787a..9e8ba91b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -48,6 +48,8 @@
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
+-export([execute_mnesia_transaction/2]).
+-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
-export([upmap/2, map_in_order/2]).
@@ -67,6 +69,7 @@
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([lock_file/1]).
+-export([const_ok/1, const/1]).
%%----------------------------------------------------------------------------
@@ -142,6 +145,10 @@
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
-> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
+-spec(execute_mnesia_transaction/2 ::
+ (thunk(A), fun ((A, boolean()) -> B)) -> B).
+-spec(execute_mnesia_tx_with_tail/1 ::
+ (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(makenode/1 :: ({string(), string()} | string()) -> node()).
-spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
@@ -196,6 +203,8 @@
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+-spec(const_ok/1 :: (any()) -> 'ok').
+-spec(const/1 :: (A) -> fun ((_) -> A)).
-endif.
@@ -377,6 +386,35 @@ execute_mnesia_transaction(TxFun) ->
{aborted, Reason} -> throw({error, Reason})
end.
+
+%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
+%% commit function
+execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
+ case mnesia:is_transaction() of
+ true -> throw(unexpected_transaction);
+ false -> ok
+ end,
+ PrePostCommitFun(execute_mnesia_transaction(
+ fun () ->
+ Result = TxFun(),
+ PrePostCommitFun(Result, true),
+ Result
+ end), false).
+
+%% Like execute_mnesia_transaction/2, but TxFun is expected to return a
+%% TailFun which gets called immediately before and after the tx commit
+execute_mnesia_tx_with_tail(TxFun) ->
+ case mnesia:is_transaction() of
+ true -> execute_mnesia_transaction(TxFun);
+ false -> TailFun = execute_mnesia_transaction(
+ fun () ->
+ TailFun1 = TxFun(),
+ TailFun1(true),
+ TailFun1
+ end),
+ TailFun(false)
+ end.
+
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
@@ -805,3 +843,6 @@ lock_file(Path) ->
false -> {ok, Lock} = file:open(Path, [write]),
ok = file:close(Lock)
end.
+
+const_ok(_) -> ok.
+const(X) -> fun (_) -> X end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2e1834c7..f8b41ed3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -81,6 +81,7 @@
file_summary_ets, %% tid of the file summary table
dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
+ dying_clients, %% set of dying clients
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
@@ -306,6 +307,17 @@
%% sure that reads are not attempted from files which are in the
%% process of being garbage collected.
%%
+%% When a message is removed, its reference count is decremented. Even
+%% if the reference count becomes 0, its entry is not removed. This is
+%% because in the event of the same message being sent to several
+%% different queues, there is the possibility of one queue writing and
+%% removing the message before other queues write it at all. Thus
+%% accomodating 0-reference counts allows us to avoid unnecessary
+%% writes here. Of course, there are complications: the file to which
+%% the message has already been written could be locked pending
+%% deletion or GC, which means we have to rewrite the message as the
+%% original copy will now be lost.
+%%
%% The server automatically defers reads, removes and contains calls
%% that occur which refer to files which are currently being
%% GC'd. Contains calls are only deferred in order to ensure they do
@@ -323,6 +335,55 @@
%% heavily overloaded, clients can still write and read messages with
%% very low latency and not block at all.
%%
+%% Clients of the msg_store are required to register before using the
+%% msg_store. This provides them with the necessary client-side state
+%% to allow them to directly access the various caches and files. When
+%% they terminate, they should deregister. They can do this by calling
+%% either client_terminate/1 or client_delete_and_terminate/1. The
+%% differences are: (a) client_terminate is synchronous. As a result,
+%% if the msg_store is badly overloaded and has lots of in-flight
+%% writes and removes to process, this will take some time to
+%% return. However, once it does return, you can be sure that all the
+%% actions you've issued to the msg_store have been processed. (b) Not
+%% only is client_delete_and_terminate/1 asynchronous, but it also
+%% permits writes and subsequent removes from the current
+%% (terminating) client which are still in flight to be safely
+%% ignored. Thus from the point of view of the msg_store itself, and
+%% all from the same client:
+%%
+%% (T) = termination; (WN) = write of msg N; (RN) = remove of msg N
+%% --> W1, W2, W1, R1, T, W3, R2, W2, R1, R2, R3, W4 -->
+%%
+%% The client obviously sent T after all the other messages (up to
+%% W4), but because the msg_store prioritises messages, the T can be
+%% promoted and thus received early.
+%%
+%% Thus at the point of the msg_store receiving T, we have messages 1
+%% and 2 with a refcount of 1. After T, W3 will be ignored because
+%% it's an unknown message, as will R3, and W4. W2, R1 and R2 won't be
+%% ignored because the messages that they refer to were already known
+%% to the msg_store prior to T. However, it can be a little more
+%% complex: after the first R2, the refcount of msg 2 is 0. At that
+%% point, if a GC occurs or file deletion, msg 2 could vanish, which
+%% would then mean that the subsequent W2 and R2 are then ignored.
+%%
+%% The use case then for client_delete_and_terminate/1 is if the
+%% client wishes to remove everything it's written to the msg_store:
+%% it issues removes for all messages it's written and not removed,
+%% and then calls client_delete_and_terminate/1. At that point, any
+%% in-flight writes (and subsequent removes) can be ignored, but
+%% removes and writes for messages the msg_store already knows about
+%% will continue to be processed normally (which will normally just
+%% involve modifying the reference count, which is fast). Thus we save
+%% disk bandwidth for writes which are going to be immediately removed
+%% again by the the terminating client.
+%%
+%% We use a separate set to keep track of the dying clients in order
+%% to keep that set, which is inspected on every write and remove, as
+%% small as possible. Inspecting client_refs - the set of all clients
+%% - would degrade performance with many healthy clients and few, if
+%% any, dying clients, which is the typical case.
+%%
%% For notes on Clean Shutdown and startup, see documentation in
%% variable_queue.
@@ -361,6 +422,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
+ ok = server_cast(CState, {client_dying, Ref}),
ok = server_cast(CState, {client_delete, Ref}).
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
@@ -598,6 +660,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
+ dying_clients = sets:new(),
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
@@ -643,6 +706,7 @@ prioritise_cast(Msg, _State) ->
{combine_files, _Source, _Destination, _Reclaimed} -> 8;
{delete_file, _File, _Reclaimed} -> 8;
{set_maximum_since_use, _Age} -> 8;
+ {client_dying, _Pid} -> 7;
_ -> 0
end.
@@ -681,65 +745,65 @@ handle_call({contains, Guid}, From, State) ->
State1 = contains_message(Guid, From, State),
noreply(State1).
+handle_cast({client_dying, CRef},
+ State = #msstate { dying_clients = DyingClients }) ->
+ DyingClients1 = sets:add_element(CRef, DyingClients),
+ write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+
handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- State1 = clear_client_callback(CRef, State),
- noreply(State1 #msstate {
- client_refs = sets:del_element(CRef, ClientRefs) });
+ State = #msstate { client_refs = ClientRefs,
+ dying_clients = DyingClients }) ->
+ State1 = clear_client_callback(
+ CRef, State #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs),
+ dying_clients = sets:del_element(CRef, DyingClients) }),
+ noreply(remove_message(CRef, CRef, State1));
handle_cast({write, CRef, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- current_file = CurFile,
+ State = #msstate { file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
- end,
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
State1 = State #msstate { cref_to_guids = CTG1 },
- case index_lookup(Guid, State1) of
- not_found ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ noreply(State);
+ {false, not_found} ->
write_message(Guid, Msg, State1);
- #msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
+ {Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }} ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
ok = index_delete(Guid, State1),
write_message(Guid, Msg, State1);
- [#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State1),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, TotalSize}]),
- noreply(State1 #msstate {
- sum_valid_data = SumValid + TotalSize })
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently,
+ %% we'll have to write a new copy, which will then
+ %% be younger, so ignore this write.
+ noreply(State);
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
- #msg_location { ref_count = RefCount, file = File } ->
+ {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State1),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
- CTG;
- _ -> CTG1
- end,
- noreply(State #msstate { cref_to_guids = CTG2 })
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, State2) end,
+ fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
State, Guids),
- State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
- noreply(maybe_compact(State2));
+ noreply(maybe_compact(
+ client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -861,9 +925,9 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
CGs = dict:fold(fun (CRef, Guids, NS) ->
case gb_sets:is_empty(Guids) of
@@ -871,14 +935,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
false -> [{CRef, Guids} | NS]
end
end, [], CTG),
- if Syncs =:= [] andalso CGs =:= [] -> ok;
- true -> file_handle_cache:sync(CurHdl)
+ case {Syncs, CGs} of
+ {[], []} -> ok;
+ _ -> file_handle_cache:sync(CurHdl)
end,
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ [K() || K <- lists:reverse(Syncs)],
+ [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
-
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
@@ -990,34 +1054,43 @@ contains_message(Guid, From,
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
- #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize } =
- index_lookup_positive_ref_count(Guid, State),
- %% only update field, otherwise bad interaction with concurrent GC
- Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here because
- %% there may be further writes in the mailbox for the same
- %% msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
- case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true } ] ->
- add_to_pending_gc_completion({remove, Guid}, File, State);
- [#file_summary {}] ->
+remove_message(Guid, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts }) ->
+ case should_mask_action(CRef, Guid, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg
+ %% whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }} when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec =
+ fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME here
+ %% because there may be further writes in the mailbox
+ %% for the same msg.
+ 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, Guid, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(File, -TotalSize,
+ State))
+ end;
+ _ -> ok = decrement_cache(DedupCacheEts, Guid),
ok = Dec(),
- [_] = ets:update_counter(
- FileSummaryEts, File,
- [{#file_summary.valid_total_size, -TotalSize}]),
- delete_file_if_empty(
- File, State #msstate {
- sum_valid_data = SumValid - TotalSize })
- end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
- ok = Dec(),
- State
+ State
+ end
end.
add_to_pending_gc_completion(
@@ -1039,8 +1112,8 @@ run_pending_action({read, Guid, From}, State) ->
read_message(Guid, From, State);
run_pending_action({contains, Guid, From}, State) ->
contains_message(Guid, From, State);
-run_pending_action({remove, Guid}, State) ->
- remove_message(Guid, State).
+run_pending_action({remove, Guid, CRef}, State) ->
+ remove_message(Guid, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1051,15 +1124,22 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
+adjust_valid_total_size(File, Delta, State = #msstate {
+ sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts }) ->
+ [_] = ets:update_counter(FileSummaryEts, File,
+ [{#file_summary.valid_total_size, Delta}]),
+ State #msstate { sum_valid_data = SumValid + Delta }.
+
orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-client_confirm(CRef, Guids,
+client_confirm(CRef, Guids, ActionTaken,
State = #msstate { client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
case dict:find(CRef, CODC) of
- {ok, Fun} -> Fun(Guids),
+ {ok, Fun} -> Fun(Guids, ActionTaken),
CTG1 = case dict:find(CRef, CTG) of
{ok, Gs} ->
Guids1 = gb_sets:difference(Gs, Guids),
@@ -1073,6 +1153,52 @@ client_confirm(CRef, Guids,
error -> State
end.
+add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
+ case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef,
+ fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end.
+
+client_confirm_if_on_disk(CRef, Guid, File,
+ State = #msstate { client_ondisk_callback = CODC,
+ current_file = CurFile,
+ cref_to_guids = CTG }) ->
+ CTG1 =
+ case File of
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
+ _ -> case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
+ _ -> ok
+ end,
+ CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }.
+
+%% Detect whether the Guid is older or younger than the client's death
+%% msg (if there is one). If the msg is older than the client death
+%% msg, and it has a 0 ref_count we must only alter the ref_count, not
+%% rewrite the msg - rewriting it would make it younger than the death
+%% msg and thus should be ignored. Note that this (correctly) returns
+%% false when testing to remove the death msg itself.
+should_mask_action(CRef, Guid,
+ State = #msstate { dying_clients = DyingClients }) ->
+ case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ {false, Location} ->
+ {false, Location};
+ {true, not_found} ->
+ {true, not_found};
+ {true, #msg_location { file = File, offset = Offset,
+ ref_count = RefCount } = Location} ->
+ #msg_location { file = DeathFile, offset = DeathOffset } =
+ index_lookup(CRef, State),
+ {case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
+ {true, _} -> true;
+ {false, 0} -> false_if_increment;
+ {false, _} -> false
+ end, Location}
+ end.
%%----------------------------------------------------------------------------
%% file helper functions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 89954b06..c6a083bb 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,7 +32,7 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([is_ssl/1, controlling_process/2, getstat/2,
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
async_recv/3, port_command/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -50,6 +50,9 @@
-type(socket() :: port() | #ssl_socket{}).
-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(ssl_info/1 :: (socket())
+ -> 'nossl' | ok_val_or_error(
+ {atom(), {atom(), atom(), atom()}})).
-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(getstat/2 ::
(socket(), [stat_option()])
@@ -77,6 +80,11 @@
is_ssl(Sock) -> ?IS_SSL(Sock).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_info(Sock#ssl_socket.ssl);
+ssl_info(_Sock) ->
+ nossl.
+
controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 76c0a4ef..2162104f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,7 @@
-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
- publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
+ publish/5, deliver/2, ack/2, sync/1, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
@@ -297,11 +297,12 @@ deliver(SeqIds, State) ->
ack(SeqIds, State) ->
deliver_or_ack(ack, SeqIds, State).
-sync([], State) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = undefined }) ->
- State;
-sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
+%% This is only called when there are outstanding confirms and the
+%% queue is idle.
+sync(State = #qistate { unsynced_guids = Guids }) ->
+ sync_if([] =/= Guids, State).
+
+sync(SeqIds, State) ->
%% The SeqIds here contains the SeqId of every publish and ack in
%% the transaction. Ideally we should go through these seqids and
%% only sync the journal if the pubs or acks appear in the
@@ -309,9 +310,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% the variable queue publishes and acks to the qi, and then
%% syncs, all in one operation, there is no possibility of the
%% seqids not being in the journal, provided the transaction isn't
- %% emptied (handled above anyway).
- ok = file_handle_cache:sync(JournalHdl),
- notify_sync(State).
+ %% emptied (handled by sync_if anyway).
+ sync_if([] =/= SeqIds, State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -723,6 +723,14 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+sync_if(false, State) ->
+ State;
+sync_if(_Bool, State = #qistate { journal_handle = undefined }) ->
+ State;
+sync_if(true, State = #qistate { journal_handle = JournalHdl }) ->
+ ok = file_handle_cache:sync(JournalHdl),
+ notify_sync(State).
+
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e87ff879..08bc18ba 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([conserve_memory/2, server_properties/0]).
--export([analyze_frame/3]).
+-export([process_channel_frame/5]). %% used by erlang-client
-export([emit_stats/1]).
@@ -65,6 +65,8 @@
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism,
+ ssl_protocol, ssl_key_exchange,
+ ssl_cipher, ssl_hash,
protocol, user, vhost, timeout, frame_max,
client_properties]).
@@ -347,12 +349,12 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
throw(E);
- {channel_exit, ChannelOrFrPid, Reason} ->
- mainloop(Deb, handle_channel_exit(ChannelOrFrPid, Reason, State));
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChSupPid, Reason, State));
+ {channel_exit, Channel, Reason} ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -443,45 +445,32 @@ close_channel(Channel, State) ->
put({channel, Channel}, closing),
State.
-handle_channel_exit(ChFrPid, Reason, State) when is_pid(ChFrPid) ->
- {channel, Channel} = get({ch_fr_pid, ChFrPid}),
- handle_exception(State, Channel, Reason);
-handle_channel_exit(Channel, Reason, State) ->
- handle_exception(State, Channel, Reason).
-
-handle_dependent_exit(ChSupPid, Reason, State) ->
+handle_dependent_exit(ChPid, Reason, State) ->
case termination_kind(Reason) of
controlled ->
- case erase({ch_sup_pid, ChSupPid}) of
- undefined -> ok;
- {_Channel, {ch_fr_pid, _ChFrPid} = ChFr} -> erase(ChFr)
- end,
+ erase({ch_pid, ChPid}),
maybe_close(State);
uncontrolled ->
- case channel_cleanup(ChSupPid) of
- undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
- Channel ->
- maybe_close(handle_exception(State, Channel, Reason))
+ case channel_cleanup(ChPid) of
+ undefined -> exit({abnormal_dependent_exit, ChPid, Reason});
+ Channel -> maybe_close(
+ handle_exception(State, Channel, Reason))
end
end.
-channel_cleanup(ChSupPid) ->
- case get({ch_sup_pid, ChSupPid}) of
- undefined -> undefined;
- {{channel, Channel}, ChFr} -> erase({channel, Channel}),
- erase(ChFr),
- erase({ch_sup_pid, ChSupPid}),
- Channel
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ Channel -> erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ Channel
end.
-all_channels() -> [ChFrPid || {{ch_sup_pid, _ChSupPid},
- {_Channel, {ch_fr_pid, ChFrPid}}} <- get()].
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()].
terminate_channels() ->
NChannels =
- length([rabbit_framing_channel:shutdown(ChFrPid)
- || ChFrPid <- all_channels()]),
+ length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
if NChannels > 0 ->
Timeout = 1000 * ?CHANNEL_TERMINATION_TIMEOUT * NChannels,
TimerRef = erlang:send_after(Timeout, self(), cancel_wait),
@@ -499,10 +488,10 @@ wait_for_channel_termination(0, TimerRef) ->
wait_for_channel_termination(N, TimerRef) ->
receive
- {'DOWN', _MRef, process, ChSupPid, Reason} ->
- case channel_cleanup(ChSupPid) of
+ {'DOWN', _MRef, process, ChPid, Reason} ->
+ case channel_cleanup(ChPid) of
undefined ->
- exit({abnormal_dependent_exit, ChSupPid, Reason});
+ exit({abnormal_dependent_exit, ChPid, Reason});
Channel ->
case termination_kind(Reason) of
controlled ->
@@ -533,15 +522,13 @@ maybe_close(State) ->
State.
termination_kind(normal) -> controlled;
-termination_kind(shutdown) -> controlled;
-termination_kind({shutdown, _Term}) -> controlled;
termination_kind(_) -> uncontrolled.
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -551,7 +538,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
@@ -560,19 +547,23 @@ handle_frame(Type, 0, Payload,
end;
handle_frame(Type, Channel, Payload,
State = #v1{connection = #connection{protocol = Protocol}}) ->
- case analyze_frame(Type, Payload, Protocol) of
+ case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ch_fr_pid, ChFrPid} ->
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame),
+ {ChPid, FramingState} ->
+ NewAState = process_channel_frame(
+ AnalyzedFrame, self(),
+ Channel, ChPid, FramingState),
+ put({channel, Channel}, {ChPid, NewAState}),
case AnalyzedFrame of
{method, 'channel.close', _} ->
erase({channel, Channel}),
State;
{method, MethodName, _} ->
- case (State#v1.connection_state =:= blocking andalso
+ case (State#v1.connection_state =:= blocking
+ andalso
Protocol:method_has_content(MethodName)) of
true -> State#v1{connection_state = blocked};
false -> State
@@ -601,9 +592,8 @@ handle_frame(Type, Channel, Payload,
State;
undefined ->
case ?IS_RUNNING(State) of
- true -> ok = send_to_new_channel(
- Channel, AnalyzedFrame, State),
- State;
+ true -> send_to_new_channel(
+ Channel, AnalyzedFrame, State);
false -> throw({channel_frame_while_starting,
Channel, State#v1.connection_state,
AnalyzedFrame})
@@ -611,22 +601,6 @@ handle_frame(Type, Channel, Payload,
end
end.
-analyze_frame(?FRAME_METHOD,
- <<ClassId:16, MethodId:16, MethodFields/binary>>,
- Protocol) ->
- MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
- {method, MethodName, MethodFields};
-analyze_frame(?FRAME_HEADER,
- <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
- _Protocol) ->
- {content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body, _Protocol) ->
- {content_body, Body};
-analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
- heartbeat;
-analyze_frame(_Type, _Body, _Protocol) ->
- error.
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -768,17 +742,10 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
not_allowed, "frame_max=~w > ~w max size",
[FrameMax, ?FRAME_MAX]);
true ->
- SendFun =
- fun() ->
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- catch rabbit_net:send(Sock, Frame)
- end,
-
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
- ReceiveFun =
- fun() ->
- Parent ! timeout
- end,
+ ReceiveFun = fun() -> Parent ! timeout end,
Heartbeater = SHF(Sock, ClientHeartbeat, SendFun,
ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
@@ -809,7 +776,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
fun() -> internal_emit_stats(State1) end),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
- lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
+ lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
@@ -905,6 +872,14 @@ i(peer_port, #v1{sock = Sock}) ->
socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock);
i(ssl, #v1{sock = Sock}) ->
rabbit_net:is_ssl(Sock);
+i(ssl_protocol, #v1{sock = Sock}) ->
+ ssl_info(fun ({P, _}) -> P end, Sock);
+i(ssl_key_exchange, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {K, _, _}}) -> K end, Sock);
+i(ssl_cipher, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, C, _}}) -> C end, Sock);
+i(ssl_hash, #v1{sock = Sock}) ->
+ ssl_info(fun ({_, {_, _, H}}) -> H end, Sock);
i(peer_cert_issuer, #v1{sock = Sock}) ->
cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock);
i(peer_cert_subject, #v1{sock = Sock}) ->
@@ -955,6 +930,13 @@ socket_info(Get, Select) ->
{error, _} -> ''
end.
+ssl_info(F, Sock) ->
+ case rabbit_net:ssl_info(Sock) of
+ nossl -> '';
+ {error, _} -> '';
+ {ok, Info} -> F(Info)
+ end.
+
cert_info(F, Sock) ->
case rabbit_net:peercert(Sock) of
nossl -> '';
@@ -971,15 +953,29 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
frame_max = FrameMax,
user = User,
vhost = VHost}} = State,
- {ok, ChSupPid, ChFrPid} =
+ {ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
ChanSupSup, {Protocol, Sock, Channel, FrameMax,
self(), User, VHost, Collector}),
- erlang:monitor(process, ChSupPid),
- put({channel, Channel}, {ch_fr_pid, ChFrPid}),
- put({ch_sup_pid, ChSupPid}, {{channel, Channel}, {ch_fr_pid, ChFrPid}}),
- put({ch_fr_pid, ChFrPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChFrPid, AnalyzedFrame).
+ erlang:monitor(process, ChPid),
+ NewAState = process_channel_frame(AnalyzedFrame, self(),
+ Channel, ChPid, AState),
+ put({channel, Channel}, {ChPid, NewAState}),
+ put({ch_pid, ChPid}, Channel),
+ State.
+
+process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
+ Method, Content),
+ NewAState;
+ {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8ceb4410..d913092c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1696,7 +1696,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
false -> ?TRANSIENT_MSG_STORE
end,
MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
- {A, B} =
+ {A, B = [{_SeqId, LastGuidWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
Guid = rabbit_guid:guid(),
@@ -1705,6 +1705,8 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
ok = rabbit_msg_store:write(Guid, Guid, MSCState),
{QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]}
end, {Qi, []}, SeqIds),
+ %% do this just to force all of the publishes through to the msg_store:
+ true = rabbit_msg_store:contains(LastGuidWritten, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
{A, B}.
@@ -1888,7 +1890,7 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
VQ = rabbit_variable_queue:init(test_queue(), true, false,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1990,7 +1992,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2000,7 +2002,7 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
@@ -2034,7 +2036,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2064,7 +2066,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2081,7 +2083,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2112,7 +2114,7 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = rabbit_variable_queue:init(QName, true, true,
- fun nop/1, fun nop/1),
+ fun nop/2, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2174,3 +2176,4 @@ test_configurable_server_properties() ->
passed.
nop(_) -> ok.
+nop(_, _) -> ok.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index fc00976a..b5ff2b12 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -28,7 +28,7 @@
-rabbit_upgrade({hash_passwords, []}).
-rabbit_upgrade({add_ip_to_listener, []}).
-rabbit_upgrade({internal_exchanges, []}).
--rabbit_upgrade({user_to_internal_user, []}).
+-rabbit_upgrade({user_to_internal_user, [hash_passwords]}).
%% -------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 565c61e7..665cac96 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -412,7 +412,9 @@ stop_msg_store() ->
init(QueueName, IsDurable, Recover) ->
Self = self(),
init(QueueName, IsDurable, Recover,
- fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids, ActionTaken) ->
+ msgs_written_to_disk(Self, Guids, ActionTaken)
+ end,
fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
@@ -519,7 +521,9 @@ publish(Msg, MsgProps, State) ->
{_SeqId, State1} = publish(Msg, MsgProps, false, false, State),
a(reduce_memory_use(State1)).
-publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
+publish_delivered(false, #basic_message { guid = Guid },
+ _MsgProps, State = #vqstate { len = 0 }) ->
+ blind_confirm(self(), gb_sets:singleton(Guid)),
{blank_ack, a(State)};
publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
guid = Guid },
@@ -531,20 +535,20 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- unconfirmed = Unconfirmed1 }))}.
+ unconfirmed = UC1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -654,15 +658,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1);
- (#msg_status{msg = #basic_message { guid = Guid }}, State1) ->
- remove_confirms(gb_sets:singleton(Guid), State1)
- end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -712,7 +710,7 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
+ a(reduce_memory_use(
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
@@ -727,8 +725,7 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -812,17 +809,22 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) ->
- {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State),
- Res;
-needs_idle_timeout(_State) ->
- true.
+needs_idle_timeout(State = #vqstate { on_sync = OnSync }) ->
+ case {OnSync, needs_index_sync(State)} of
+ {?BLANK_SYNC, false} ->
+ {Res, _State} = reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State),
+ Res;
+ _ ->
+ true
+ end.
-idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
+idle_timeout(State) ->
+ a(reduce_memory_use(confirm_commit_index(tx_commit_index(State)))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
@@ -1160,7 +1162,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1172,7 +1173,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1236,7 +1237,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount,
- unconfirmed = Unconfirmed }) ->
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1246,13 +1247,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, Guid, UC),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
ram_msg_count = RamMsgCount + 1,
- unconfirmed = Unconfirmed1 }}.
+ unconfirmed = UC1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1323,7 +1324,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ {PersistentSeqIds, GuidsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1342,9 +1343,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore, AllGuids},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1364,24 +1365,21 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {lists:reverse(AllGuids),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false,
- guid = Guid },
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
- {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
- [Guid | AllGuids]}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of
@@ -1393,6 +1391,13 @@ find_persistent_count(LensByStore) ->
%% Internal plumbing for confirms (aka publisher acks)
%%----------------------------------------------------------------------------
+confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
+ case needs_index_sync(State) of
+ true -> State #vqstate {
+ index_state = rabbit_queue_index:sync(IndexState) };
+ false -> State
+ end.
+
remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1400,10 +1405,31 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
unconfirmed = gb_sets:difference(UC, GuidSet) }.
+needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ %% If UC is empty then by definition, MIOD and MOD are also empty
+ %% and there's nothing that can be pending a sync.
+
+ %% If UC is not empty, then we want to find is_empty(UC - MIOD),
+ %% but the subtraction can be expensive. Thus instead, we test to
+ %% see if UC is a subset of MIOD. This can only be the case if
+ %% MIOD == UC, which would indicate that every message in UC is
+ %% also in MIOD and is thus _all_ pending on a msg_store sync, not
+ %% on a qi sync. Thus the negation of this is sufficient. Because
+ %% is_subset is short circuiting, this is more efficient than the
+ %% subtraction.
+ not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
+
msgs_confirmed(GuidSet, State) ->
{gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-msgs_written_to_disk(QPid, GuidSet) ->
+blind_confirm(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun (State) -> msgs_confirmed(GuidSet, State) end).
+
+msgs_written_to_disk(QPid, GuidSet, removed) ->
+ blind_confirm(QPid, GuidSet);
+msgs_written_to_disk(QPid, GuidSet, written) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index f939a3fe..16ae193a 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -53,37 +53,40 @@ add(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
- [] ->
- ok = mnesia:write(rabbit_vhost,
- #vhost{virtual_host = VHostPath},
- write),
- [rabbit_exchange:declare(
- rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
- ok;
- [_] ->
- mnesia:abort({vhost_already_exists, VHostPath})
+ [] -> ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write);
+ [_] -> mnesia:abort({vhost_already_exists, VHostPath})
end
+ end,
+ fun (ok, true) ->
+ ok;
+ (ok, false) ->
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout}]],
+ ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
delete(VHostPath) ->
- %%FIXME: We are forced to delete the queues outside the TX below
- %%because queue deletion involves sending messages to the queue
- %%process, which in turn results in further mnesia actions and
- %%eventually the termination of that process.
- lists:foreach(fun (Q) ->
- {ok,_} = rabbit_amqqueue:delete(Q, false, false)
- end,
- rabbit_amqqueue:list(VHostPath)),
+ %% FIXME: We are forced to delete the queues and exchanges outside
+ %% the TX below. Queue deletion involves sending messages to the queue
+ %% process, which in turn results in further mnesia actions and
+ %% eventually the termination of that process. Exchange deletion causes
+ %% notifications which must be sent outside the TX
+ [{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
+ Q <- rabbit_amqqueue:list(VHostPath)],
+ [ok = rabbit_exchange:delete(Name, false) ||
+ #exchange{name = Name} <- rabbit_exchange:list(VHostPath)],
R = rabbit_misc:execute_mnesia_transaction(
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
@@ -92,10 +95,6 @@ delete(VHostPath) ->
R.
internal_delete(VHostPath) ->
- lists:foreach(fun (#exchange{name = Name}) ->
- ok = rabbit_exchange:delete(Name, false)
- end,
- rabbit_exchange:list(VHostPath)),
lists:foreach(
fun ({Username, _, _, _}) ->
ok = rabbit_auth_backend_internal:clear_permissions(Username,