summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--docs/html-to-website-xml.xsl11
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/rabbit.hrl5
-rw-r--r--include/rabbit_backing_queue_spec.hrl3
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/macports/Portfile.in14
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-multi3
-rwxr-xr-xscripts/rabbitmq-server6
-rw-r--r--scripts/rabbitmq-server.bat8
-rw-r--r--scripts/rabbitmq-service.bat8
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_amqqueue_process.erl176
-rw-r--r--src/rabbit_basic.erl14
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl6
-rw-r--r--src/rabbit_channel.erl261
-rw-r--r--src/rabbit_control.erl41
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_invariable_queue.erl314
-rw-r--r--src/rabbit_log.erl3
-rw-r--r--src/rabbit_misc.erl50
-rw-r--r--src/rabbit_mnesia.erl25
-rw-r--r--src/rabbit_msg_store.erl199
-rw-r--r--src/rabbit_net.erl85
-rw-r--r--src/rabbit_persister.erl496
-rw-r--r--src/rabbit_prelaunch.erl (renamed from src/rabbit_plugin_activator.erl)43
-rw-r--r--src/rabbit_queue_index.erl85
-rw-r--r--src/rabbit_reader.erl5
-rw-r--r--src/rabbit_router.erl9
-rw-r--r--src/rabbit_tests.erl49
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade.erl40
-rw-r--r--src/rabbit_variable_queue.erl378
-rw-r--r--src/rabbit_writer.erl59
40 files changed, 1081 insertions, 1381 deletions
diff --git a/Makefile b/Makefile
index d3f052f6..e0d5744c 100644
--- a/Makefile
+++ b/Makefile
@@ -178,9 +178,6 @@ start-rabbit-on-node: all
stop-rabbit-on-node: all
echo "rabbit:stop()." | $(ERL_CALL)
-force-snapshot: all
- echo "rabbit_persister:force_snapshot()." | $(ERL_CALL)
-
set-memory-alarm: all
echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \
$(ERL_CALL)
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index ec8f87e5..4bfcf6ca 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -1,6 +1,7 @@
<?xml version='1.0'?>
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:doc="http://www.rabbitmq.com/namespaces/ad-hoc/doc"
+ xmlns="http://www.w3.org/1999/xhtml"
version='1.0'>
<xsl:param name="original"/>
@@ -10,10 +11,16 @@
<xsl:template match="*"/>
<!-- Copy every element through -->
-<xsl:template match="@*|node()">
- <xsl:copy><xsl:apply-templates select="@*|node()"/></xsl:copy>
+<xsl:template match="*">
+ <xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
+ <xsl:apply-templates select="@*|node()"/>
+ </xsl:element>
</xsl:template>
+<xsl:template match="@*">
+ <xsl:copy/>
+</xsl:template>
+
<!-- Copy the root node, and munge the outer part of the page -->
<xsl:template match="/html">
<xsl:processing-instruction name="xml-stylesheet">type="text/xml" href="page.xsl"</xsl:processing-instruction>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9315047c..3888f198 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -6,7 +6,6 @@
{registered, [rabbit_amqqueue_sup,
rabbit_log,
rabbit_node_monitor,
- rabbit_persister,
rabbit_router,
rabbit_sup,
rabbit_tcp_client_sup]},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a1987fb2..fccfad97 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -69,12 +69,13 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message}).
+-record(delivery, {mandatory, immediate, txn, sender, message,
+ msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry}).
+-record(message_properties, {expiry, needs_confirming = false}).
%%----------------------------------------------------------------------------
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 20230b24..f67c6f46 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -37,6 +37,7 @@
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(ack_required() :: boolean()).
+-type(confirm_required() :: boolean()).
-type(message_properties_transformer() ::
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
@@ -57,7 +58,7 @@
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
--spec(ack/2 :: ([ack()], state()) -> state()).
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(),
rabbit_types:message_properties(), state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 5032f471..b37f7ab1 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -124,6 +124,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1
+- New Upstream Release
+
* Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index e81fda24..a60e691d 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.2.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Rob Harrop <rob@rabbitmq.com> Mon, 29 Nov 2010 12:24:48 +0000
+
rabbitmq-server (2.1.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index ce6b1e34..f8417b83 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -7,6 +7,8 @@ version @VERSION@
categories net
maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer
platforms darwin
+supported_archs noarch
+
description The RabbitMQ AMQP Server
long_description \
RabbitMQ is an implementation of AMQP, the emerging standard for \
@@ -31,17 +33,13 @@ checksums \
depends_lib port:erlang
depends_build port:libxslt
-platform darwin 7 {
- depends_build-append port:py25-simplejson
- build.args PYTHON=${prefix}/bin/python2.5
-}
platform darwin 8 {
- depends_build-append port:py25-simplejson
- build.args PYTHON=${prefix}/bin/python2.5
+ depends_build-append port:py26-simplejson
+ build.args PYTHON=${prefix}/bin/python2.6
}
platform darwin 9 {
- depends_build-append port:py25-simplejson
- build.args PYTHON=${prefix}/bin/python2.5
+ depends_build-append port:py26-simplejson
+ build.args PYTHON=${prefix}/bin/python2.6
}
# no need for simplejson on Snow Leopard or higher
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 36734874..8cb470d0 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -48,6 +48,8 @@ done
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname`
+NODENAME=rabbit@${HOSTNAME%%.*}
# Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 59050692..33883702 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,8 +29,7 @@
##
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
+
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index c5d883c3..5b291290 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,8 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
@@ -91,8 +89,8 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then
-pa "$RABBITMQ_EBIN_ROOT" \
-noinput \
-hidden \
- -s rabbit_plugin_activator \
- -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}"
+ -s rabbit_prelaunch \
+ -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}"
then
RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit"
RABBITMQ_EBIN_PATH=""
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 94180de9..872c87e3 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -117,13 +117,13 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
--s rabbit_plugin_activator ^
+-s rabbit_prelaunch ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
+ "!RABBITMQ_NODENAME!"
set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if not exist "!RABBITMQ_BOOT_FILE!.boot" (
- echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
+if ERRORLEVEL 1 (
exit /B 1
)
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 2c96b6fd..d2592931 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -186,13 +186,13 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin
"!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
--s rabbit_plugin_activator ^
+-s rabbit_prelaunch ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
+ ""
set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if not exist "!RABBITMQ_BOOT_FILE!.boot" (
- echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
+if ERRORLEVEL 1 (
exit /B 1
)
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 76ce25fd..56cff891 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,9 +30,6 @@
## Contributor(s): ______________________________________.
##
-[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
-NODENAME=rabbit@${HOSTNAME%%.*}
-
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e6657b32..2ebfdecf 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -170,12 +170,6 @@
%%---------------------------------------------------------------------------
--import(application).
--import(mnesia).
--import(lists).
--import(inet).
--import(gen_tcp).
-
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
@@ -294,7 +288,8 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason])
+ _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
+ [Reason, erlang:get_stacktrace()])
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5cdd0e3c..775c631d 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,6 +34,7 @@
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
+ maybe_run_queue_via_backing_queue_async/2,
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
@@ -48,11 +49,6 @@
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--import(mnesia).
--import(gen_server2).
--import(lists).
--import(queue).
-
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -156,7 +152,9 @@
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
- (pid(), (fun ((A) -> A))) -> 'ok').
+ (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/2 ::
+ (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -279,7 +277,7 @@ assert_equivalence(#amqqueue{durable = Durable,
assert_equivalence(#amqqueue{name = QueueName},
_Durable, _AutoDelete, _RequiredArgs, _Owner) ->
rabbit_misc:protocol_error(
- not_allowed, "parameters for ~s not equivalent",
+ precondition_failed, "parameters for ~s not equivalent",
[rabbit_misc:rs(QueueName)]).
check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
@@ -380,16 +378,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
-deliver(QPid, #delivery{immediate = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid},
- infinity);
-deliver(QPid, #delivery{mandatory = true,
- txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity),
+deliver(QPid, Delivery = #delivery{immediate = true}) ->
+ gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
+deliver(QPid, Delivery = #delivery{mandatory = true}) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity),
true;
-deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
- gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}),
+deliver(QPid, Delivery) ->
+ gen_server2:cast(QPid, {deliver, Delivery}),
true.
requeue(QPid, MsgIds, ChPid) ->
@@ -466,6 +461,9 @@ internal_delete(QueueName) ->
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
+maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+
update_ram_duration(QPid) ->
gen_server2:cast(QPid, update_ram_duration).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a999fe58..78bb6835 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}).
+-define(BASE_MESSAGE_PROPERTIES,
+ #message_properties{expiry = undefined, needs_confirming = false}).
-export([start_link/1, info_keys/0]).
@@ -47,10 +48,6 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2]).
--import(queue).
--import(erlang).
--import(lists).
-
% Queue's state
-record(q, {q,
exclusive_consumer,
@@ -64,6 +61,7 @@
rate_timer_ref,
expiry_timer_ref,
stats_timer,
+ guid_to_channel,
ttl,
ttl_timer_ref
}).
@@ -128,7 +126,8 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
- stats_timer = rabbit_event:init_stats_timer()}, hibernate,
+ stats_timer = rabbit_event:init_stats_timer(),
+ guid_to_channel = dict:new()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -373,11 +372,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 = case AckRequired of
- true -> sets:add_element(
- AckTag, ChAckTags);
- false -> ChAckTags
- end,
+ {State2, ChAckTags1} =
+ case AckRequired of
+ true -> {State1,
+ sets:add_element(AckTag, ChAckTags)};
+ false -> {confirm_message(Message, State1),
+ ChAckTags}
+ end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
true = maybe_store_ch_record(NewC),
@@ -393,10 +394,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{ActiveConsumers1,
queue:in(QEntry, BlockedConsumers1)}
end,
- State2 = State1#q{
+ State3 = State2#q{
active_consumers = NewActiveConsumers,
blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
+ deliver_msgs_to_consumers(Funs, FunAcc1, State3);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
true = maybe_store_ch_record(C#cr{is_limit_active = true}),
@@ -424,6 +425,39 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+confirm_messages(Guids, State) ->
+ lists:foldl(fun confirm_message_by_guid/2, State, Guids).
+
+confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) ->
+ case dict:find(Guid, GTC) of
+ {ok, {_ , undefined}} -> ok;
+ {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ _ -> ok
+ end,
+ State#q{guid_to_channel = dict:erase(Guid, GTC)}.
+
+confirm_message(#basic_message{guid = Guid}, State) ->
+ confirm_message_by_guid(Guid, State).
+
+record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
+ State;
+record_confirm_message(#delivery{sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ is_persistent = true,
+ guid = Guid}},
+ State =
+ #q{guid_to_channel = GTC,
+ q = #amqqueue{durable = true}}) ->
+ State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)};
+record_confirm_message(_Delivery, State) ->
+ State.
+
+ack_by_acktags(AckTags, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {AckdGuids, BQS1} = BQ:ack(AckTags, BQS),
+ confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}).
+
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
@@ -433,7 +467,17 @@ run_message_queue(State) ->
{_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
State2.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(#delivery{txn = none,
+ sender = ChPid,
+ message = Message,
+ msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ, q = Q}) ->
+ NeedsConfirming = Message#basic_message.is_persistent andalso
+ Q#amqqueue.durable,
+ case NeedsConfirming of
+ false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ _ -> ok
+ end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -441,29 +485,36 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
%% not being enqueued, so we use an empty
%% message_properties.
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message,
- ?BASE_MESSAGE_PROPERTIES, BQS),
+ BQ:publish_delivered(
+ AckRequired, Message,
+ (?BASE_MESSAGE_PROPERTIES)#message_properties{
+ needs_confirming = NeedsConfirming},
+ BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+attempt_delivery(#delivery{txn = Txn,
+ sender = ChPid,
+ message = Message},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true,
State#q{backing_queue_state =
BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}.
-deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- case attempt_delivery(Txn, ChPid, Message, State) of
- {true, NewState} ->
- {true, NewState};
- {false, NewState} ->
- %% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message,
- message_properties(State),
- State #q.backing_queue_state),
- {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
+deliver_or_enqueue(Delivery, State) ->
+ case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
+ {true, State1} ->
+ {true, State1};
+ {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
+ #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery,
+ BQS1 = BQ:publish(Message,
+ (message_properties(State)) #message_properties{
+ needs_confirming = (MsgSeqNo =/= undefined)},
+ BQS),
+ {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -566,7 +617,12 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
+ {BQS2, State1} =
+ case Fun(BQS) of
+ {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)};
+ BQS1 -> {BQS1, State}
+ end,
+ run_message_queue(State1#q{backing_queue_state = BQS2}).
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
@@ -745,7 +801,8 @@ handle_call(consumers, _From,
[{ChPid, ConsumerTag, AckRequired} | Acc]
end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
-handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
+handle_call({deliver_immediately, Delivery = #delivery{message = Message}},
+ _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
%% FIXME: Is this correct semantics?
@@ -759,12 +816,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
- {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
- reply(Delivered, NewState);
-
-handle_call({deliver, Txn, Message, ChPid}, _From, State) ->
+ {Delivered, State1} =
+ attempt_delivery(Delivery, record_confirm_message(Delivery, State)),
+ reply(Delivered, case Delivered of
+ true -> State1;
+ false -> confirm_message(Message, State1)
+ end);
+
+handle_call({deliver, Delivery}, _From, State) ->
%% Synchronous, "mandatory" delivery mode
- {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {Delivered, NewState} = deliver_or_enqueue(Delivery, State),
reply(Delivered, NewState);
handle_call({commit, Txn, ChPid}, From, State) ->
@@ -790,15 +851,18 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{empty, State2} ->
reply(empty, State2);
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
- case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- true = maybe_store_ch_record(
- C#cr{acktags = sets:add_element(AckTag,
- ChAckTags)});
- false -> ok
- end,
+ State3 =
+ case AckRequired of
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true = maybe_store_ch_record(
+ C#cr{acktags =
+ sets:add_element(AckTag,
+ ChAckTags)}),
+ State2;
+ false -> confirm_message(Message, State2)
+ end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State2)
+ reply({ok, Remaining, Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -911,9 +975,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
-handle_cast({deliver, Txn, Message, ChPid}, State) ->
+
+handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, State));
+
+handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
+ {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
noreply(NewState);
handle_cast({ack, Txn, AckTags, ChPid},
@@ -922,18 +990,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
not_found ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
- {C1, BQS1} =
+ {C1, State1} =
case Txn of
none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
- _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
+ NewC = C#cr{acktags = ChAckTags1},
+ NewState = ack_by_acktags(AckTags, State),
+ {NewC, NewState};
+ _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {C#cr{txn = Txn},
+ State#q{backing_queue_state = BQS1}}
end,
maybe_store_ch_record(C1),
- noreply(State#q{backing_queue_state = BQS1})
+ noreply(State1)
end;
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
@@ -942,8 +1013,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
- false -> BQS1 = BQ:ack(AckTags, BQS),
- State #q { backing_queue_state = BQS1 }
+ false -> ack_by_acktags(AckTags, State)
end)
end;
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 38412982..1ac39b65 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4]).
+-export([publish/1, message/4, properties/1, delivery/5]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
-export([is_message_persistent/1]).
@@ -50,9 +50,10 @@
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
+-spec(delivery/5 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message()) -> rabbit_types:delivery()).
+ rabbit_types:message(), undefined | integer()) ->
+ rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary()) ->
@@ -88,9 +89,9 @@ publish(Delivery = #delivery{
Other
end.
-delivery(Mandatory, Immediate, Txn, Message) ->
+delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message}.
+ sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
@@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
BodyBin) ->
publish(delivery(Mandatory, Immediate, Txn,
message(ExchangeName, RoutingKeyBin,
- properties(Properties), BodyBin))).
+ properties(Properties), BodyBin),
+ undefined)).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index b2997ae2..a5297a70 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -49,8 +49,6 @@
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index ebf063f0..4b4358b4 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -36,8 +36,6 @@
-export([parse_table/1, parse_properties/2]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 5ba9616a..ccadf5af 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -350,10 +350,10 @@ group_bindings_fold(Fun, SrcName, Acc, Removed, Bindings) ->
group_bindings_fold(Fun, Fun(SrcName, Bindings, Acc), Removed).
maybe_auto_delete(XName, Bindings, Deletions) ->
- case rabbit_exchange:lookup(XName) of
- {error, not_found} ->
+ case mnesia:read(rabbit_exchange, XName) of
+ [] ->
add_deletion(XName, {undefined, not_deleted, Bindings}, Deletions);
- {ok, X} ->
+ [X] ->
add_deletion(XName, {X, not_deleted, Bindings},
case rabbit_exchange:maybe_auto_delete(X) of
not_deleted -> Deletions;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6bed63a3..4e9bd4b1 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,10 +35,10 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1]).
+-export([emit_stats/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -48,7 +48,9 @@
start_limiter_fun, transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer,
+ confirm_enabled, publish_seqno, confirm_multiple, confirm_tref,
+ held_confirms, unconfirmed, queues_for_msg}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -70,6 +72,8 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(FLUSH_CONFIRMS_INTERVAL, 1000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -86,12 +90,15 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok').
+-spec(flush_confirms/1 :: (pid()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
@@ -115,6 +122,9 @@ do(Pid, Method) ->
do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
+flush(Pid) ->
+ gen_server2:call(Pid, flush).
+
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -127,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+confirm(Pid, MsgSeqNo) ->
+ gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}).
+
+flush_confirms(Pid) ->
+ gen_server2:cast(Pid, flush_confirms).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -150,9 +166,6 @@ info_all(Items) ->
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
-flush(Pid) ->
- gen_server2:call(Pid, flush).
-
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
@@ -177,7 +190,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid,
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = StatsTimer},
+ stats_timer = StatsTimer,
+ confirm_enabled = false,
+ publish_seqno = 0,
+ confirm_multiple = false,
+ held_confirms = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ queues_for_msg = dict:new()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -197,6 +216,9 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
+handle_call(flush, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -206,9 +228,6 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
handle_call(_Request, _From, State) ->
noreply(State).
@@ -242,14 +261,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
-handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+handle_cast({deliver, ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
- ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+
+ M = #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
+
maybe_incr_stats([{QPid, 1}],
case AckRequired of
true -> deliver;
@@ -261,21 +290,38 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
internal_emit_stats(State),
{noreply,
State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)},
- hibernate}.
-
-handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ hibernate};
+
+handle_cast(flush_confirms, State) ->
+ {noreply, internal_flush_confirms(State)};
+
+handle_cast({confirm, MsgSeqNo, From}, State) ->
+ {noreply, confirm(MsgSeqNo, From, State)}.
+
+handle_info({'DOWN', _MRef, process, QPid, _Reason},
+ State = #ch{queues_for_msg = QFM}) ->
+ State1 = dict:fold(
+ fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
+ Qs = sets:del_element(QPid, QPids),
+ case sets:size(Qs) of
+ 0 -> confirm(Msg, QPid, State0);
+ _ -> State0#ch{queues_for_msg =
+ dict:store(Msg, Qs, QFM0)}
+ end
+ end, State, QFM),
erase_queue_stats(QPid),
- {noreply, queue_blocked(QPid, State), hibernate}.
+ {noreply, queue_blocked(QPid, State1), hibernate}.
-handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
+handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
+ State1 = internal_flush_confirms(State),
rabbit_event:if_enabled(StatsTimer,
fun () ->
internal_emit_stats(
State, [{idle_since, now()}])
end),
- {hibernate,
- State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}.
+ StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
+ {hibernate, State1#ch{stats_timer = StatsTimer1}}.
terminate(_Reason, State = #ch{state = terminating}) ->
terminate(State);
@@ -420,6 +466,52 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+confirm(undefined, _QPid, State) ->
+ State;
+confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) ->
+ State;
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) ->
+ do_if_unconfirmed(MsgSeqNo, QPid,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{
+ delivery_tag = MSN}),
+ State1
+ end, State);
+confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) ->
+ do_if_unconfirmed(MsgSeqNo, QPid,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_confirm_timer(
+ State1#ch{held_confirms = gb_sets:add(MSN, As)})
+ end, State).
+
+do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
+ State = #ch{unconfirmed = UC,
+ queues_for_msg = QFM}) ->
+ %% clears references to MsgSeqNo and does ConfirmFun
+ case gb_sets:is_element(MsgSeqNo, UC) of
+ true ->
+ Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC),
+ case QPid of
+ undefined ->
+ ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1});
+ _ ->
+ {ok, Qs} = dict:find(MsgSeqNo, QFM),
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> ConfirmFun(MsgSeqNo,
+ State#ch{
+ queues_for_msg =
+ dict:erase(MsgSeqNo, QFM),
+ unconfirmed = Unconfirmed1});
+ _ -> State#ch{queues_for_msg =
+ dict:store(MsgSeqNo, Qs1, QFM)}
+ end
+ end;
+ false ->
+ State
+ end.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -442,9 +534,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ confirm_enabled = ConfirmEnabled}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -452,6 +544,15 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
+ {MsgSeqNo, State1}
+ = case ConfirmEnabled of
+ false -> {undefined, State};
+ true -> SeqNo = State#ch.publish_seqno,
+ {SeqNo,
+ State#ch{publish_seqno = SeqNo + 1,
+ unconfirmed =
+ gb_sets:add(SeqNo, State#ch.unconfirmed)}}
+ end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -460,18 +561,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
- case RoutingRes of
- routed -> ok;
- unroutable -> ok = basic_return(Message, WriterPid, no_route);
- not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
- end,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State),
+ QPid <- DeliveredQPids]], publish, State2),
{noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State2;
+ _ -> add_tx_participants(DeliveredQPids, State2)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -860,6 +959,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
+
+handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from confirm to tx mode", []);
+
handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
@@ -880,6 +984,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
+handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
+ when TxId =/= none ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot switch from tx to confirm mode", []);
+
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
+ _, State = #ch{confirm_enabled = false}) ->
+ return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple},
+ NoWait, #'confirm.select_ok'{});
+
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait},
+ _, State = #ch{confirm_enabled = true,
+ confirm_multiple = Multiple}) ->
+ return_ok(State, NoWait, #'confirm.select_ok'{});
+
+handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "cannot change confirm_multiple setting", []);
+
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -1107,28 +1230,68 @@ is_message_persistent(Content) ->
IsPersistent
end.
+process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_route),
+ confirm(MsgSeqNo, undefined, State);
+process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
+ ok = basic_return(Message, State#ch.writer_pid, no_consumers),
+ confirm(MsgSeqNo, undefined, State);
+process_routing_result(routed, [], MsgSeqNo, _, State) ->
+ confirm(MsgSeqNo, undefined, State);
+process_routing_result(routed, _, undefined, _, State) ->
+ State;
+process_routing_result(routed, QPids, MsgSeqNo, _,
+ State = #ch{queues_for_msg = QFM}) ->
+ QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
+ [maybe_monitor(QPid) || QPid <- QPids],
+ State#ch{queues_for_msg = QFM1}.
+
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
- {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}) ->
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- ok = case Notify of
- true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, self(), M, Content);
- false -> rabbit_writer:send_command(WriterPid, M, Content)
- end.
+start_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL,
+ ?MODULE, flush_confirms, [self()]),
+ State#ch{confirm_tref = TRef};
+start_confirm_timer(State) ->
+ State.
+
+stop_confirm_timer(State = #ch{confirm_tref = undefined}) ->
+ State;
+stop_confirm_timer(State = #ch{confirm_tref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#ch{confirm_tref = undefined}.
+
+internal_flush_confirms(State = #ch{writer_pid = WriterPid,
+ held_confirms = Cs}) ->
+ case gb_sets:is_empty(Cs) of
+ true -> State#ch{confirm_tref = undefined};
+ false -> [First | Rest] = gb_sets:to_list(Cs),
+ {Mult, Inds} = find_consecutive_sequence(First, Rest),
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = Mult, multiple = true}),
+ ok = lists:foldl(
+ fun(T, ok) -> rabbit_writer:send_command(
+ WriterPid,
+ #'basic.ack'{delivery_tag = T})
+ end, ok, Inds),
+ State#ch{held_confirms = gb_sets:new(),
+ confirm_tref = undefined}
+ end.
+
+%% Find longest sequence of consecutive numbers at the beginning.
+find_consecutive_sequence(Last, []) ->
+ {Last, []};
+find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) ->
+ find_consecutive_sequence(N, Ns);
+find_consecutive_sequence(Last, Ns) ->
+ {Last, Ns}.
-terminate(_State) ->
+terminate(State) ->
+ stop_confirm_timer(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 72b77b1f..360217a2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,7 +32,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
@@ -50,6 +50,7 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
+-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -116,24 +117,28 @@ fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- fmt_stderr("diagnostics:", []),
+ [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
+
+diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- fmt_stderr("- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]);
- {ok, NamePorts} ->
- fmt_stderr("- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]])
- end,
- fmt_stderr("- current node: ~w", [node()]),
- case init:get_argument(home) of
- {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
- Other -> fmt_stderr("- no current node home dir: ~p", [Other])
- end,
- fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
- ok.
+ [
+ {"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
+ ].
stop() ->
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 9cc70a26..7414c904 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -176,7 +176,7 @@ assert_equivalence(X = #exchange{ durable = Durable,
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
_Args) ->
rabbit_misc:protocol_error(
- not_allowed,
+ precondition_failed,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
deleted file mode 100644
index 5a0532ea..00000000
--- a/src/rabbit_invariable_queue.erl
+++ /dev/null
@@ -1,314 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_invariable_queue).
-
--export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3,
- publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
- dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
- idle_timeout/1, handle_pre_hibernate/1, status/1]).
-
--export([start/1, stop/0]).
-
--behaviour(rabbit_backing_queue).
-
--include("rabbit.hrl").
-
--record(iv_state, { queue, qname, durable, len, pending_ack }).
--record(tx, { pending_messages, pending_acks, is_persistent }).
-
--ifdef(use_specs).
-
--type(ack() :: rabbit_guid:guid() | 'blank_ack').
--type(state() :: #iv_state { queue :: queue(),
- qname :: rabbit_amqqueue:name(),
- len :: non_neg_integer(),
- pending_ack :: dict()
- }).
--include("rabbit_backing_queue_spec.hrl").
-
--endif.
-
-start(DurableQueues) ->
- ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]).
-
-stop() ->
- ok = rabbit_sup:stop_child(rabbit_persister).
-
-init(QName, IsDurable, Recover) ->
- Q = queue:from_list(case IsDurable andalso Recover of
- true -> rabbit_persister:queue_content(QName);
- false -> []
- end),
- #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = queue:len(Q),
- pending_ack = dict:new() }.
-
-terminate(State) ->
- State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }.
-
-delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA),
- {_PLen, State1} = purge(State),
- terminate(State1).
-
-purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable,
- len = Len }) ->
- %% We do not purge messages pending acks.
- {AckTags, PA} =
- rabbit_misc:queue_fold(
- fun ({#basic_message { is_persistent = false },
- _MsgProps, _IsDelivered}, Acc) ->
- Acc;
- ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered},
- {AckTagsN, PAN}) ->
- ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)}
- end, {[], dict:new()}, Q),
- ok = persist_acks(QName, IsDurable, none, AckTags, PA),
- {Len, State #iv_state { len = 0, queue = queue:new() }}.
-
-publish(Msg, MsgProps, State = #iv_state { queue = Q,
- qname = QName,
- durable = IsDurable,
- len = Len }) ->
- ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
- State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }.
-
-publish_delivered(false, _Msg, _MsgProps, State) ->
- {blank_ack, State};
-publish_delivered(true, Msg = #basic_message { guid = Guid },
- MsgProps,
- State = #iv_state { qname = QName, durable = IsDurable,
- len = 0, pending_ack = PA }) ->
- ok = persist_message(QName, IsDurable, none, Msg, MsgProps),
- ok = persist_delivery(QName, IsDurable, false, Msg),
- {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}.
-
-dropwhile(_Pred, State = #iv_state { len = 0 }) ->
- State;
-dropwhile(Pred, State = #iv_state { queue = Q }) ->
- {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
- case Pred(MsgProps) of
- true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps,
- IsDelivered, State),
- dropwhile(Pred, State1);
- false -> State
- end.
-
-fetch(_AckRequired, State = #iv_state { len = 0 }) ->
- {empty, State};
-fetch(AckRequired, State = #iv_state { queue = Q }) ->
- {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q),
- fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State).
-
-fetch_internal(AckRequired, Q1,
- Msg = #basic_message { guid = Guid },
- MsgProps, IsDelivered,
- State = #iv_state { len = Len,
- qname = QName,
- durable = IsDurable,
- pending_ack = PA }) ->
- Len1 = Len - 1,
- ok = persist_delivery(QName, IsDurable, IsDelivered, Msg),
- PA1 = store_ack(Msg, MsgProps, PA),
- {AckTag, PA2} = case AckRequired of
- true -> {Guid, PA1};
- false -> ok = persist_acks(QName, IsDurable, none,
- [Guid], PA1),
- {blank_ack, PA}
- end,
- {{Msg, IsDelivered, AckTag, Len1},
- State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}.
-
-ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- ok = persist_acks(QName, IsDurable, none, AckTags, PA),
- PA1 = remove_acks(AckTags, PA),
- State #iv_state { pending_ack = PA1 }.
-
-tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName,
- durable = IsDurable }) ->
- Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }),
- ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps),
- State.
-
-tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable,
- pending_ack = PA }) ->
- Tx = #tx { pending_acks = Acks } = lookup_tx(Txn),
- store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }),
- ok = persist_acks(QName, IsDurable, Txn, AckTags, PA),
- State.
-
-tx_rollback(Txn, State = #iv_state { qname = QName }) ->
- #tx { pending_acks = AckTags } = lookup_tx(Txn),
- ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1,
- Txn, QName),
- erase_tx(Txn),
- {lists:flatten(AckTags), State}.
-
-tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName,
- pending_ack = PA,
- queue = Q,
- len = Len }) ->
- #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn),
- ok = do_if_persistent(fun rabbit_persister:commit_transaction/1,
- Txn, QName),
- erase_tx(Txn),
- Fun(),
- AckTags1 = lists:flatten(AckTags),
- PA1 = remove_acks(AckTags1, PA),
- {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) ->
- {enqueue(Msg, MsgPropsFun(MsgProps),
- false, QN),
- LenN + 1}
- end, {Q, Len}, PubsRev),
- {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}.
-
-requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA,
- queue = Q,
- len = Len }) ->
- %% We don't need to touch the persister here - the persister will
- %% already have these messages published and delivered as
- %% necessary. The complication is that the persister's seq_id will
- %% now be wrong, given the position of these messages in our queue
- %% here. However, the persister's seq_id is only used for sorting
- %% on startup, and requeue is silent as to where the requeued
- %% messages should appear, thus the persister is permitted to sort
- %% based on seq_id, even though it'll likely give a different
- %% order to the last known state of our queue, prior to shutdown.
- {Q1, Len1} = lists:foldl(
- fun (Guid, {QN, LenN}) ->
- {Msg = #basic_message {}, MsgProps}
- = dict:fetch(Guid, PA),
- {enqueue(Msg, MsgPropsFun(MsgProps), true, QN),
- LenN + 1}
- end, {Q, Len}, AckTags),
- PA1 = remove_acks(AckTags, PA),
- State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }.
-
-enqueue(Msg, MsgProps, IsDelivered, Q) ->
- queue:in({Msg, MsgProps, IsDelivered}, Q).
-
-len(#iv_state { len = Len }) -> Len.
-
-is_empty(State) -> 0 == len(State).
-
-set_ram_duration_target(_DurationTarget, State) -> State.
-
-ram_duration(State) -> {0, State}.
-
-needs_idle_timeout(_State) -> false.
-
-idle_timeout(State) -> State.
-
-handle_pre_hibernate(State) -> State.
-
-status(_State) -> [].
-
-%%----------------------------------------------------------------------------
-
-remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags).
-
-store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) ->
- dict:store(Guid, {Msg, MsgProps}, PA).
-
-%%----------------------------------------------------------------------------
-
-lookup_tx(Txn) ->
- case get({txn, Txn}) of
- undefined -> #tx { pending_messages = [],
- pending_acks = [],
- is_persistent = false };
- V -> V
- end.
-
-store_tx(Txn, Tx) ->
- put({txn, Txn}, Tx).
-
-erase_tx(Txn) ->
- erase({txn, Txn}).
-
-mark_tx_persistent(Txn) ->
- store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }).
-
-is_tx_persistent(Txn) ->
- (lookup_tx(Txn)) #tx.is_persistent.
-
-do_if_persistent(F, Txn, QName) ->
- ok = case is_tx_persistent(Txn) of
- false -> ok;
- true -> F({Txn, QName})
- end.
-
-%%----------------------------------------------------------------------------
-
-persist_message(QName, true, Txn, Msg = #basic_message {
- is_persistent = true }, MsgProps) ->
- Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties
- content = rabbit_binary_parser:clear_decoded_content(
- Msg #basic_message.content)},
- persist_work(Txn, QName,
- [{publish, Msg1, MsgProps,
- {QName, Msg1 #basic_message.guid}}]);
-persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) ->
- ok.
-
-persist_delivery(QName, true, false, #basic_message { is_persistent = true,
- guid = Guid }) ->
- persist_work(none, QName, [{deliver, {QName, Guid}}]);
-persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) ->
- ok.
-
-persist_acks(QName, true, Txn, AckTags, PA) ->
- persist_work(Txn, QName,
- [{ack, {QName, Guid}} || Guid <- AckTags,
- begin
- {Msg, _MsgProps}
- = dict:fetch(Guid, PA),
- Msg #basic_message.is_persistent
- end]);
-persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) ->
- ok.
-
-persist_work(_Txn,_QName, []) ->
- ok;
-persist_work(none, _QName, WorkList) ->
- rabbit_persister:dirty_work(WorkList);
-persist_work(Txn, QName, WorkList) ->
- mark_tx_persistent(Txn),
- rabbit_persister:extend_transaction({Txn, QName}, WorkList).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 863f77e7..a1a8364c 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -41,9 +41,6 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
--import(io).
--import(error_logger).
-
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 230f4db5..52d76ac4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -61,16 +61,12 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, dict_cons/3, orddict_cons/3,
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
unlink_and_capture_exit/1]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-
--import(mnesia).
--import(lists).
--import(cover).
--import(disk_log).
+-export([lock_file/1]).
%%----------------------------------------------------------------------------
@@ -183,6 +179,9 @@
-spec(recursive_delete/1 ::
([file:filename()])
-> rabbit_types:ok_or_error({file:filename(), any()})).
+-spec(recursive_copy/2 ::
+ (file:filename(), file:filename())
+ -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
@@ -197,6 +196,7 @@
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
+-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-endif.
@@ -242,7 +242,7 @@ assert_args_equivalence1(Orig, New, Name, Key) ->
case {table_lookup(Orig, Key), table_lookup(New, Key)} of
{Same, Same} -> ok;
{Orig1, New1} -> protocol_error(
- not_allowed,
+ precondition_failed,
"inequivalent arg '~s' for ~s: "
"required ~w, received ~w",
[Key, rabbit_misc:rs(Name), New1, Orig1])
@@ -687,6 +687,33 @@ recursive_delete1(Path) ->
end
end.
+recursive_copy(Src, Dest) ->
+ case filelib:is_dir(Src) of
+ false -> case file:copy(Src, Dest) of
+ {ok, _Bytes} -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Src, Dest, Err}}
+ end;
+ true -> case file:list_dir(Src) of
+ {ok, FileNames} ->
+ case file:make_dir(Dest) of
+ ok ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_copy(
+ filename:join(Src, FileName),
+ filename:join(Dest, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end;
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end
+ end.
+
dict_cons(Key, Value, Dict) ->
dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
@@ -781,3 +808,12 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
true = digraph:delete(G),
{error, Reason}
end.
+
+%% TODO: When we stop supporting Erlang prior to R14, this should be
+%% replaced with file:open [write, exclusive]
+lock_file(Path) ->
+ case filelib:is_file(Path) of
+ true -> {error, eexist};
+ false -> {ok, Lock} = file:open(Path, [write]),
+ ok = file:close(Lock)
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index cb3251c7..dadfc16e 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -34,7 +34,7 @@
-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, force_cluster/1, reset/0, force_reset/0,
- is_clustered/0, empty_ram_only_tables/0]).
+ is_clustered/0, empty_ram_only_tables/0, copy_db/1]).
-export([table_names/0]).
@@ -65,6 +65,7 @@
-spec(is_clustered/0 :: () -> boolean()).
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
+-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-endif.
@@ -375,21 +376,19 @@ init_db(ClusterNodes, Force) ->
mnesia:system_info(db_nodes)} of
{[], true, [_]} ->
%% True single disc node, attempt upgrade
- wait_for_tables(),
+ ok = wait_for_tables(),
case rabbit_upgrade:maybe_upgrade() of
- ok ->
- ensure_schema_ok();
- version_not_available ->
- schema_ok_or_move()
+ ok -> ensure_schema_ok();
+ version_not_available -> schema_ok_or_move()
end;
{[], true, _} ->
%% "Master" (i.e. without config) disc node in cluster,
%% verify schema
- wait_for_tables(),
+ ok = wait_for_tables(),
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
@@ -476,6 +475,16 @@ move_db() ->
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ok.
+copy_db(Destination) ->
+ mnesia:stop(),
+ case rabbit_misc:recursive_copy(dir(), Destination) of
+ ok ->
+ rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ ok = wait_for_tables();
+ {error, E} ->
+ {error, E}
+ end.
+
create_tables() ->
lists:foreach(fun ({Tab, TabDef}) ->
TabDef1 = proplists:delete(match, TabDef),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fd84109b..e8b4e8e2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/2, client_terminate/1, client_delete_and_terminate/1,
+ client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_ref/1,
write/3, read/2, contains/2, remove/2, release/2, sync/3]).
@@ -83,7 +83,9 @@
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
- file_size_limit %% how big are our files allowed to get?
+ file_size_limit, %% how big are our files allowed to get?
+ client_ondisk_callback, %% client ref to callback function mapping
+ cref_to_guids %% client ref to synced messages mapping
}).
-record(client_msstate,
@@ -138,16 +140,18 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid() }).
+ cur_file_cache_ets :: ets:tid()}).
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
+-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/2 :: (server(), client_ref()) -> client_msstate()).
+-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) ->
+ client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
@@ -334,10 +338,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref) ->
+client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -350,9 +355,9 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
- ok = server_call(CState, client_terminate).
+ ok = server_call(CState, {client_terminate, Ref}).
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -361,9 +366,10 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(Guid, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- ok = server_cast(CState, {write, Guid}).
+ ok = server_cast(CState, {write, CRef, Guid}).
read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -392,7 +398,8 @@ read(Guid,
contains(Guid, CState) -> server_call(CState, {contains, Guid}).
remove([], _CState) -> ok;
-remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, Guids}).
release([], _CState) -> ok;
release(Guids, CState) -> server_cast(CState, {release, Guids}).
sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
@@ -519,6 +526,13 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
+clear_client_callback(CRef,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
+ cref_to_guids = dict:erase(CRef, CTG)}.
+
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -586,7 +600,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
- file_size_limit = FileSizeLimit
+ file_size_limit = FileSizeLimit,
+ client_ondisk_callback = dict:new(),
+ cref_to_guids = dict:new()
},
%% If we didn't recover the msg location index then we need to
@@ -615,10 +631,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref} -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _MODC} -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -633,22 +649,29 @@ prioritise_cast(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- gc_pid = GCPid }) ->
+handle_call({new_client_state, CRef, Callback}, _From,
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs,
+ client_ondisk_callback = CODC,
+ gc_pid = GCPid }) ->
+ CODC1 = case Callback of
+ undefined -> CODC;
+ _ -> dict:store(CRef, Callback, CODC)
+ end,
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
+ State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
+ client_ondisk_callback = CODC1 });
-handle_call(client_terminate, _From, State) ->
- reply(ok, State);
+handle_call({client_terminate, CRef}, _From,
+ State) ->
+ reply(ok, clear_client_callback(CRef, State));
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
@@ -660,43 +683,63 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
+ State1 = clear_client_callback(CRef, State),
+ noreply(State1 #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs) });
+
+handle_cast({write, CRef, Guid},
+ State = #msstate { sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts,
+ current_file = CurFile,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
-handle_cast({write, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case index_lookup(Guid, State) of
+ CTG1 = case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef, fun(Guids) ->
+ gb_sets:add(Guid, Guids)
+ end,
+ gb_sets:empty(), CTG);
+ error -> CTG
+ end,
+ State1 = State #msstate { cref_to_guids = CTG1 },
+ case index_lookup(Guid, State1) of
not_found ->
- write_message(Guid, Msg, State);
+ write_message(Guid, Msg, State1);
#msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
- ok = index_delete(Guid, State),
- write_message(Guid, Msg, State);
+ ok = index_delete(Guid, State1),
+ write_message(Guid, Msg, State1);
[#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State),
+ ok = index_update_ref_count(Guid, 1, State1),
[_] = ets:update_counter(
FileSummaryEts, File,
[{#file_summary.valid_total_size, TotalSize}]),
- noreply(State #msstate {
+ noreply(State1 #msstate {
sum_valid_data = SumValid + TotalSize })
end;
- #msg_location { ref_count = RefCount } ->
+ #msg_location { ref_count = RefCount, file = File } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(State)
+ ok = index_update_ref_count(Guid, RefCount + 1, State1),
+ CTG2 = case {dict:find(CRef, CODC), File} of
+ {{ok, _}, CurFile} -> CTG1;
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
+ CTG;
+ _ -> CTG1
+ end,
+ noreply(State #msstate { cref_to_guids = CTG2 })
end;
-handle_cast({remove, Guids}, State) ->
+handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, State2) end,
State, Guids),
- noreply(maybe_compact(State1));
+ State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
+ noreply(maybe_compact(State2));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -794,14 +837,19 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #msstate { sync_timer_ref = undefined }) ->
- {start_sync_timer(State), 0};
-next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
+ end;
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
+ end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
@@ -813,15 +861,23 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
- case Syncs of
- [] -> State1;
- _ -> ok = file_handle_cache:sync(CurHdl),
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { on_sync = [] }
- end.
+ CGs = dict:fold(fun (CRef, Guids, NS) ->
+ case gb_sets:is_empty(Guids) of
+ true -> NS;
+ false -> [{CRef, Guids} | NS]
+ end
+ end, [], CTG),
+ if Syncs =:= [] andalso CGs =:= [] -> ok;
+ true -> file_handle_cache:sync(CurHdl)
+ end,
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -999,6 +1055,25 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
+client_confirm(CRef, Guids,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(Guids),
+ CTG1 = case dict:find(CRef, CTG) of
+ {ok, Gs} ->
+ Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 };
+ error -> State
+ end.
+
+
%%----------------------------------------------------------------------------
%% file helper functions
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 0940dce2..89954b06 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,9 +32,9 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([async_recv/3, close/1, controlling_process/2,
- getstat/2, peername/1, peercert/1, port_command/2,
- send/2, sockname/1, is_ssl/1]).
+-export([is_ssl/1, controlling_process/2, getstat/2,
+ async_recv/3, port_command/2, send/2, close/1,
+ sockname/1, peername/1, peercert/1]).
%%---------------------------------------------------------------------------
@@ -49,26 +49,25 @@
-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
+-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
+-spec(getstat/2 ::
+ (socket(), [stat_option()])
+ -> ok_val_or_error([{stat_option(), integer()}])).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
--spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(send/2 ::
- (socket(), binary() | iolist()) -> ok_or_any_error()).
+-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
+-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(sockname/1 ::
+ (socket())
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(peername/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
--spec(sockname/1 ::
- (socket())
- -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
--spec(is_ssl/1 :: (socket()) -> boolean()).
--spec(getstat/2 ::
- (socket(), [stat_option()])
- -> ok_val_or_error([{stat_option(), integer()}])).
-endif.
@@ -76,6 +75,18 @@
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
+is_ssl(Sock) -> ?IS_SSL(Sock).
+
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
+ ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
+ inet:getstat(Sock#ssl_socket.tcp, Stats);
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
@@ -90,31 +101,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
-close(Sock) when ?IS_SSL(Sock) ->
- ssl:close(Sock#ssl_socket.ssl);
-close(Sock) when is_port(Sock) ->
- gen_tcp:close(Sock).
-
-controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
- ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
-controlling_process(Sock, Pid) when is_port(Sock) ->
- gen_tcp:controlling_process(Sock, Pid).
-
-getstat(Sock, Stats) when ?IS_SSL(Sock) ->
- inet:getstat(Sock#ssl_socket.tcp, Stats);
-getstat(Sock, Stats) when is_port(Sock) ->
- inet:getstat(Sock, Stats).
-
-peername(Sock) when ?IS_SSL(Sock) ->
- ssl:peername(Sock#ssl_socket.ssl);
-peername(Sock) when is_port(Sock) ->
- inet:peername(Sock).
-
-peercert(Sock) when ?IS_SSL(Sock) ->
- ssl:peercert(Sock#ssl_socket.ssl);
-peercert(Sock) when is_port(Sock) ->
- nossl.
-
port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
ok -> self() ! {inet_reply, Sock, ok},
@@ -124,16 +110,17 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
-send(Sock, Data) when ?IS_SSL(Sock) ->
- ssl:send(Sock#ssl_socket.ssl, Data);
-send(Sock, Data) when is_port(Sock) ->
- gen_tcp:send(Sock, Data).
+send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data);
+send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
+
+close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
+close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
+sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
+sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
-sockname(Sock) when ?IS_SSL(Sock) ->
- ssl:sockname(Sock#ssl_socket.ssl);
-sockname(Sock) when is_port(Sock) ->
- inet:sockname(Sock).
+peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl);
+peername(Sock) when is_port(Sock) -> inet:peername(Sock).
-is_ssl(Sock) ->
- ?IS_SSL(Sock).
+peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
+peercert(Sock) when is_port(Sock) -> nossl.
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
deleted file mode 100644
index 11056c8e..00000000
--- a/src/rabbit_persister.erl
+++ /dev/null
@@ -1,496 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_persister).
-
--behaviour(gen_server).
-
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([transaction/1, extend_transaction/2, dirty_work/1,
- commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, queue_content/1]).
-
--include("rabbit.hrl").
-
--define(SERVER, ?MODULE).
-
--define(LOG_BUNDLE_DELAY, 5).
--define(COMPLETE_BUNDLE_DELAY, 2).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-
--record(pstate, {log_handle, entry_count, deadline,
- pending_logs, pending_replies, snapshot}).
-
-%% two tables for efficient persistency
-%% one maps a key to a message
-%% the other maps a key to one or more queues.
-%% The aim is to reduce the overload of storing a message multiple times
-%% when it appears in several queues.
--record(psnapshot, {transactions, messages, queues, next_seq_id}).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(pkey() :: rabbit_guid:guid()).
--type(pmsg() :: {rabbit_amqqueue:name(), pkey()}).
-
--type(work_item() ::
- {publish,
- rabbit_types:message(), rabbit_types:message_properties(), pmsg()} |
- {deliver, pmsg()} |
- {ack, pmsg()}).
-
--spec(start_link/1 :: ([rabbit_amqqueue:name()]) ->
- rabbit_types:ok_pid_or_error()).
--spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()])
- -> 'ok').
--spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
--spec(rollback_transaction/1 ::
- ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok').
--spec(force_snapshot/0 :: () -> 'ok').
--spec(queue_content/1 ::
- (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]).
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(DurableQueues) ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []).
-
-transaction(MessageList) ->
- ?LOGDEBUG("transaction ~p~n", [MessageList]),
- TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
-
-extend_transaction(TxnKey, MessageList) ->
- ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
- gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}).
-
-dirty_work(MessageList) ->
- ?LOGDEBUG("dirty_work ~p~n", [MessageList]),
- gen_server:cast(?SERVER, {dirty_work, MessageList}).
-
-commit_transaction(TxnKey) ->
- ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
-
-rollback_transaction(TxnKey) ->
- ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
- gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
-
-force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot, infinity).
-
-queue_content(QName) ->
- gen_server:call(?SERVER, {queue_content, QName}, infinity).
-
-%%--------------------------------------------------------------------
-
-init([DurableQueues]) ->
- process_flag(trap_exit, true),
- FileName = base_filename(),
- ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{transactions = dict:new(),
- messages = ets:new(messages, []),
- queues = ets:new(queues, [ordered_set]),
- next_seq_id = 0},
- LogHandle =
- case disk_log:open([{name, rabbit_persister},
- {head, current_snapshot(Snapshot)},
- {file, FileName}]) of
- {ok, LH} -> LH;
- {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} ->
- WarningFun = if
- Bad > 0 -> fun rabbit_log:warning/2;
- true -> fun rabbit_log:info/2
- end,
- WarningFun("Repaired persister log - ~p recovered, ~p bad~n",
- [Recovered, Bad]),
- LH
- end,
- {Res, NewSnapshot} =
- internal_load_snapshot(LogHandle, DurableQueues, Snapshot),
- case Res of
- ok ->
- ok = take_snapshot(LogHandle, NewSnapshot);
- {error, Reason} ->
- rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
- ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
- end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
- pending_replies = [],
- snapshot = NewSnapshot},
- {ok, State}.
-
-handle_call({transaction, Key, MessageList}, From, State) ->
- NewState = internal_extend(Key, MessageList, State),
- do_noreply(internal_commit(From, Key, NewState));
-handle_call({commit_transaction, TxnKey}, From, State) ->
- do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State) ->
- do_reply(ok, flush(true, State));
-handle_call({queue_content, QName}, _From,
- State = #pstate{snapshot = #psnapshot{messages = Messages,
- queues = Queues}}) ->
- MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [],
- [{{'$4', '$1', '$2', '$3'}}]}],
- do_reply([{ets:lookup_element(Messages, K, 2), MP, D} ||
- {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))],
- State);
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast({rollback_transaction, TxnKey}, State) ->
- do_noreply(internal_rollback(TxnKey, State));
-handle_cast({dirty_work, MessageList}, State) ->
- do_noreply(internal_dirty_work(MessageList, State));
-handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
- do_noreply(internal_extend(TxnKey, MessageList, State));
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-handle_info(timeout, State = #pstate{deadline = infinity}) ->
- State1 = flush(true, State),
- {noreply, State1, hibernate};
-handle_info(timeout, State) ->
- do_noreply(flush(State));
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State = #pstate{log_handle = LogHandle}) ->
- flush(State),
- disk_log:close(LogHandle),
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, flush(State)}.
-
-%%--------------------------------------------------------------------
-
-internal_extend(Key, MessageList, State) ->
- log_work(fun (ML) -> {extend_transaction, Key, ML} end,
- MessageList, State).
-
-internal_dirty_work(MessageList, State) ->
- log_work(fun (ML) -> {dirty_work, ML} end,
- MessageList, State).
-
-internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {commit_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- complete(From, Unit, State#pstate{snapshot = NewSnapshot}).
-
-internal_rollback(Key, State = #pstate{snapshot = Snapshot}) ->
- Unit = {rollback_transaction, Key},
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-complete(From, Item, State = #pstate{deadline = ExistingDeadline,
- pending_logs = Logs,
- pending_replies = Waiting}) ->
- State#pstate{deadline = compute_deadline(
- ?COMPLETE_BUNDLE_DELAY, ExistingDeadline),
- pending_logs = [Item | Logs],
- pending_replies = [From | Waiting]}.
-
-%% This is made to limit disk usage by writing messages only once onto
-%% disk. We keep a table associating pkeys to messages, and provided
-%% the list of messages to output is left to right, we can guarantee
-%% that pkeys will be a backreference to a message in memory when a
-%% "tied" is met.
-log_work(CreateWorkUnit, MessageList,
- State = #pstate{
- snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
- Unit = CreateWorkUnit(
- rabbit_misc:map_in_order(
- fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) ->
- case ets:lookup(Messages, PKey) of
- [_] -> {tied, MsgProps, QK};
- [] -> ets:insert(Messages, {PKey, Message}),
- M
- end;
- (M) -> M
- end,
- MessageList)),
- NewSnapshot = internal_integrate1(Unit, Snapshot),
- log(State#pstate{snapshot = NewSnapshot}, Unit).
-
-log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
- Message) ->
- State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY,
- ExistingDeadline),
- pending_logs = [Message | Logs]}.
-
-base_filename() ->
- rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
-
-take_snapshot(LogHandle, OldFileName, Snapshot) ->
- ok = disk_log:sync(LogHandle),
- %% current_snapshot is the Head (ie. first thing logged)
- ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)).
-
-take_snapshot(LogHandle, Snapshot) ->
- OldFileName = lists:flatten(base_filename() ++ ".previous"),
- file:delete(OldFileName),
- rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-take_snapshot_and_save_old(LogHandle, Snapshot) ->
- {MegaSecs, Secs, MicroSecs} = erlang:now(),
- Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs,
- OldFileName = lists:flatten(io_lib:format("~s.saved.~p",
- [base_filename(), Timestamp])),
- rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
- ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-
-maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
- log_handle = LH,
- snapshot = Snapshot}) ->
- {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
- if
- Force orelse EntryCount >= MaxWrapEntries ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
- true ->
- State
- end.
-
-later_ms(DeltaMilliSec) ->
- {MegaSec, Sec, MicroSec} = now(),
- %% Note: not normalised. Unimportant for this application.
- {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}.
-
-%% Result = B - A, more or less
-time_diff({B1, B2, B3}, {A1, A2, A3}) ->
- (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 .
-
-compute_deadline(TimerDelay, infinity) ->
- later_ms(TimerDelay);
-compute_deadline(_TimerDelay, ExistingDeadline) ->
- ExistingDeadline.
-
-compute_timeout(infinity) ->
- {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
- HibernateAfter;
-compute_timeout(Deadline) ->
- DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
- if
- DeltaMilliSec =< 1 ->
- 0;
- true ->
- round(DeltaMilliSec)
- end.
-
-do_noreply(State = #pstate{deadline = Deadline}) ->
- {noreply, State, compute_timeout(Deadline)}.
-
-do_reply(Reply, State = #pstate{deadline = Deadline}) ->
- {reply, Reply, State, compute_timeout(Deadline)}.
-
-flush(State) -> flush(false, State).
-
-flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if PendingLogs /= [] ->
- disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- State#pstate{entry_count = State#pstate.entry_count + 1};
- true ->
- State
- end,
- State2 = maybe_take_snapshot(ForceSnapshot, State1),
- if Waiting /= [] ->
- ok = disk_log:sync(LogHandle),
- lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
- Waiting);
- true ->
- ok
- end,
- State2#pstate{deadline = infinity,
- pending_logs = [],
- pending_replies = []}.
-
-current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
- messages = Messages,
- queues = Queues,
- next_seq_id = NextSeqId}) ->
- %% Avoid infinite growth of the table by removing messages not
- %% bound to a queue anymore
- PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered,
- _MsgProps, _SeqId}, S) ->
- sets:add_element(PKey, S)
- end, sets:new(), Queues),
- prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end),
- InnerSnapshot = {{txns, Ts},
- {messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)},
- {next_seq_id, NextSeqId}},
- ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
- {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- term_to_binary(InnerSnapshot)}.
-
-prune_table(Tab, Pred) ->
- true = ets:safe_fixtable(Tab, true),
- ok = prune_table(Tab, Pred, ets:first(Tab)),
- true = ets:safe_fixtable(Tab, false).
-
-prune_table(_Tab, _Pred, '$end_of_table') -> ok;
-prune_table(Tab, Pred, Key) ->
- case Pred(Key) of
- true -> ok;
- false -> ets:delete(Tab, Key)
- end,
- prune_table(Tab, Pred, ets:next(Tab, Key)).
-
-internal_load_snapshot(LogHandle,
- DurableQueues,
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues}) ->
- {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
- case check_version(Loaded_Snapshot) of
- {ok, StateBin} ->
- {{txns, Ts}, {messages, Ms}, {queues, Qs},
- {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
- true = ets:insert(Messages, Ms),
- true = ets:insert(Queues, Qs),
- Snapshot1 = replay(Items, LogHandle, K,
- Snapshot#psnapshot{
- transactions = Ts,
- next_seq_id = NextSeqId}),
- %% Remove all entries for queues that no longer exist.
- %% Note that the 'messages' table is pruned when the next
- %% snapshot is taken.
- DurableQueuesSet = sets:from_list(DurableQueues),
- prune_table(Snapshot1#psnapshot.queues,
- fun ({QName, _PKey}) ->
- sets:is_element(QName, DurableQueuesSet)
- end),
- %% uncompleted transactions are discarded - this is TRTTD
- %% since we only get into this code on node restart, so
- %% any uncompleted transactions will have been aborted.
- {ok, Snapshot1#psnapshot{transactions = dict:new()}};
- {error, Reason} -> {{error, Reason}, Snapshot}
- end.
-
-check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
- StateBin}) ->
- {ok, StateBin};
-check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) ->
- {error, {unsupported_persister_log_format, Vsn}};
-check_version(_Other) ->
- {error, unrecognised_persister_log_format}.
-
-replay([], LogHandle, K, Snapshot) ->
- case disk_log:chunk(LogHandle, K) of
- {K1, Items} ->
- replay(Items, LogHandle, K1, Snapshot);
- {K1, Items, Badbytes} ->
- rabbit_log:warning("~p bad bytes recovering persister log~n",
- [Badbytes]),
- replay(Items, LogHandle, K1, Snapshot);
- eof -> Snapshot
- end;
-replay([Item | Items], LogHandle, K, Snapshot) ->
- NewSnapshot = internal_integrate_messages(Item, Snapshot),
- replay(Items, LogHandle, K, NewSnapshot).
-
-internal_integrate_messages(Items, Snapshot) ->
- lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end,
- Snapshot, Items).
-
-internal_integrate1({extend_transaction, Key, MessageList},
- Snapshot = #psnapshot {transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList,
- Transactions)};
-internal_integrate1({rollback_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions}) ->
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
-internal_integrate1({commit_transaction, Key},
- Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues,
- next_seq_id = SeqId}) ->
- case dict:find(Key, Transactions) of
- {ok, MessageLists} ->
- ?LOGDEBUG("persist committing txn ~p~n", [Key]),
- NextSeqId =
- lists:foldr(
- fun (ML, SeqIdN) ->
- perform_work(ML, Messages, Queues, SeqIdN) end,
- SeqId, MessageLists),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
- next_seq_id = NextSeqId};
- error ->
- Snapshot
- end;
-internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot{messages = Messages,
- queues = Queues,
- next_seq_id = SeqId}) ->
- Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
- Queues, SeqId)}.
-
-perform_work(MessageList, Messages, Queues, SeqId) ->
- lists:foldl(fun (Item, NextSeqId) ->
- perform_work_item(Item, Messages, Queues, NextSeqId)
- end, SeqId, MessageList).
-
-perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}},
- Messages, Queues, NextSeqId) ->
- true = ets:insert(Messages, {PKey, Message}),
- true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
- NextSeqId + 1;
-
-perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}),
- NextSeqId + 1;
-
-perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:update_element(Queues, QK, {2, true}),
- NextSeqId;
-
-perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
- true = ets:delete(Queues, QK),
- NextSeqId.
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_prelaunch.erl
index 072f297e..35d0876c 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,11 +29,12 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_plugin_activator).
+-module(rabbit_prelaunch).
-export([start/0, stop/0]).
-define(BaseApps, [rabbit]).
+-define(ERROR_CODE, 1).
%%----------------------------------------------------------------------------
%% Specs
@@ -52,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -130,7 +131,10 @@ start() ->
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
|| App <- PluginApps],
io:nl(),
- halt(),
+
+ ok = duplicate_node_check(Node),
+
+ terminate(0),
ok.
stop() ->
@@ -251,6 +255,37 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
process_entry(Entry) ->
[Entry].
+%% Check whether a node with the same name is already running
+duplicate_node_check([]) ->
+ %% Ignore running node while installing windows service
+ ok;
+duplicate_node_check(Node) ->
+ {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
+ case net_adm:names(NodeHost) of
+ {ok, NamePorts} ->
+ case proplists:is_defined(NodeName, NamePorts) of
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
+ end;
+ {error, address} -> ok;
+ {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
+ [EpmdReason])
+ end.
+
terminate(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
- halt(1).
+ terminate(?ERROR_CODE).
+
+terminate(Status) ->
+ case os:type() of
+ {unix, _} -> halt(Status);
+ {win32, _} -> init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 248c1fbc..76c0a4ef 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/1, shutdown_terms/1, recover/4,
+-export([init/2, shutdown_terms/1, recover/5,
terminate/2, delete_and_terminate/1,
publish/5, deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -174,7 +174,7 @@
%%----------------------------------------------------------------------------
-record(qistate, { dir, segments, journal_handle, dirty_count,
- max_journal_entries }).
+ max_journal_entries, on_sync, unsynced_guids }).
-record(segment, { num, path, journal_entries, unacked }).
@@ -195,21 +195,24 @@
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict(), [segment()]}).
+-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
dirty_count :: integer(),
- max_journal_entries :: non_neg_integer()
+ max_journal_entries :: non_neg_integer(),
+ on_sync :: on_sync_fun(),
+ unsynced_guids :: [rabbit_guid:guid()]
}).
-type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
+ {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}),
A}).
-type(shutdown_terms() :: [any()]).
--spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()).
+-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()).
-spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()).
--spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
- fun ((rabbit_guid:guid()) -> boolean())) ->
+-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
+ fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) ->
{'undefined' | non_neg_integer(), qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
-spec(delete_and_terminate/1 :: (qistate()) -> qistate()).
@@ -227,8 +230,8 @@
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
--spec(recover/1 ::
- ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}).
+-spec(recover/1 :: ([rabbit_amqqueue:name()]) ->
+ {[[any()]], startup_fun_state()}).
-spec(add_queue_ttl/0 :: () -> 'ok').
@@ -239,10 +242,10 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name) ->
+init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
false = filelib:is_file(Dir), %% is_file == is file or dir
- State.
+ State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
#qistate { dir = Dir } = blank_state(Name),
@@ -251,13 +254,14 @@ shutdown_terms(Name) ->
{ok, Terms1} -> Terms1
end.
-recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) ->
+recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
+ State1 = State #qistate { on_sync = OnSyncFun },
CleanShutdown = detect_clean_shutdown(Dir),
case CleanShutdown andalso MsgStoreRecovered of
true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
- init_clean(RecoveredCounts, State);
- false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
+ init_clean(RecoveredCounts, State1);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State1)
end.
terminate(Terms, State) ->
@@ -270,9 +274,13 @@ delete_and_terminate(State) ->
ok = rabbit_misc:recursive_delete([Dir]),
State1.
-publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) ->
+publish(Guid, SeqId, MsgProps, IsPersistent,
+ State = #qistate { unsynced_guids = UnsyncedGuids })
+ when is_binary(Guid) ->
?GUID_BYTES = size(Guid),
- {JournalHdl, State1} = get_journal_handle(State),
+ {JournalHdl, State1} = get_journal_handle(
+ State #qistate {
+ unsynced_guids = [Guid | UnsyncedGuids] }),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
@@ -303,7 +311,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
%% seqids not being in the journal, provided the transaction isn't
%% emptied (handled above anyway).
ok = file_handle_cache:sync(JournalHdl),
- State.
+ notify_sync(State).
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
@@ -393,7 +401,9 @@ blank_state(QueueName) ->
segments = segments_new(),
journal_handle = undefined,
dirty_count = 0,
- max_journal_entries = MaxJournal }.
+ max_journal_entries = MaxJournal,
+ on_sync = fun (_) -> ok end,
+ unsynced_guids = [] }.
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
@@ -625,7 +635,7 @@ flush_journal(State = #qistate { segments = Segments }) ->
{JournalHdl, State1} =
get_journal_handle(State #qistate { segments = Segments1 }),
ok = file_handle_cache:clear(JournalHdl),
- State1 #qistate { dirty_count = 0 }.
+ notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
path = Path } = Segment) ->
@@ -713,6 +723,10 @@ deliver_or_ack(Kind, SeqIds, State) ->
add_to_journal(SeqId, Kind, StateN)
end, State1, SeqIds)).
+notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
+ OnSyncFun(gb_sets:from_list(UG)),
+ State #qistate { unsynced_guids = [] }.
+
%%----------------------------------------------------------------------------
%% segment manipulation
%%----------------------------------------------------------------------------
@@ -1039,27 +1053,26 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- Size = filelib:file_size(Path),
-
- {ok, PathTmpHdl} =
- file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE],
- [{write_buffer, infinity}]),
+ case filelib:file_size(Path) of
+ 0 -> ok;
+ Size -> {ok, PathTmpHdl} =
+ file_handle_cache:open(PathTmp, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
- {ok, PathHdl} =
- file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []),
- {ok, Content} = file_handle_cache:read(PathHdl, Size),
- ok = file_handle_cache:close(PathHdl),
+ {ok, PathHdl} = file_handle_cache:open(
+ Path, [{read_ahead, Size} | ?READ_MODE], []),
+ {ok, Content} = file_handle_cache:read(PathHdl, Size),
+ ok = file_handle_cache:close(PathHdl),
- ok = drive_transform_fun(Fun, PathTmpHdl, Content),
+ ok = drive_transform_fun(Fun, PathTmpHdl, Content),
- ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path).
+ ok = file_handle_cache:close(PathTmpHdl),
+ ok = file:rename(PathTmp, Path)
+ end.
drive_transform_fun(Fun, Hdl, Contents) ->
case Fun(Contents) of
- stop ->
- ok;
- {Output, Contents1} ->
- ok = file_handle_cache:append(Hdl, Output),
- drive_transform_fun(Fun, Hdl, Contents1)
+ stop -> ok;
+ {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
+ drive_transform_fun(Fun, Hdl, Contents1)
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6b3276be..15b20bc4 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -593,8 +593,9 @@ handle_frame(Type, Channel, Payload,
%% We're already closing this channel, so
%% there's no cleanup to do (notify
%% queues, etc.)
- ok = rabbit_writer:send_command(State#v1.sock,
- #'channel.close_ok'{});
+ ok = rabbit_writer:internal_send_command(
+ State#v1.sock, Channel,
+ #'channel.close_ok'{}, Protocol);
_ -> ok
end,
State;
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 00df1ce1..d49c072c 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -72,7 +72,8 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QNames, Delivery) ->
+deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
+ immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
@@ -80,9 +81,9 @@ deliver(QNames, Delivery) ->
rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
- lists:foldl(fun fold_deliveries/2, {false, []}, Success),
- check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
- {Routed, Handled}).
+ lists:foldl(fun fold_deliveries/2, {false, []}, Success),
+ check_delivery(Mandatory, Immediate, {Routed, Handled}).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 27e4d925..adf968cb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,8 +35,6 @@
-export([all_tests/0, test_parsing/0]).
--import(lists).
-
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
@@ -1470,12 +1468,12 @@ msg_store_remove(MsgStore, Ref, Guids) ->
with_msg_store_client(MsgStore, Ref, Fun) ->
rabbit_msg_store:client_terminate(
- Fun(rabbit_msg_store:client_init(MsgStore, Ref))).
+ Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1483,7 +1481,8 @@ test_msg_store() ->
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
Ref = rabbit_guid:guid(),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
@@ -1549,7 +1548,8 @@ test_msg_store() ->
([Guid|GuidsTail]) ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
- MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
@@ -1558,7 +1558,8 @@ test_msg_store() ->
ok = rabbit_msg_store:client_terminate(MSCState5),
%% restart empty
restart_msg_store_empty(),
- MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids, MSCState6),
%% publish the first half again
@@ -1566,7 +1567,8 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
msg_store_read(Guids1stHalf, MSCState6)),
- MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
@@ -1625,12 +1627,13 @@ init_test_queue() ->
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef),
+ PRef, undefined),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
fun (Guid) ->
rabbit_msg_store:contains(Guid, PersistentClient)
- end),
+ end,
+ fun nop/1),
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
Res.
@@ -1658,7 +1661,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- MSCState = rabbit_msg_store:client_init(MsgStore, Ref),
+ MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
{A, B} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
@@ -1850,7 +1853,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ fun nop/1, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1892,7 +1896,7 @@ test_variable_queue_ack_limiting(VQ0) ->
VQ6 = check_variable_queue_status(
rabbit_variable_queue:set_ram_duration_target(0, VQ5),
[{len, Len div 2},
- {target_ram_item_count, 0},
+ {target_ram_count, 0},
{ram_msg_count, 0},
{ram_ack_count, 0}]),
@@ -1935,7 +1939,6 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% start by sending in a couple of segments worth
Len = 2*SegmentSize,
VQ1 = variable_queue_publish(false, Len, VQ0),
-
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
@@ -1953,7 +1956,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1963,7 +1966,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
+ {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -1996,7 +2000,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -2025,7 +2029,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2041,7 +2046,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2071,7 +2077,8 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2131,3 +2138,5 @@ test_configurable_server_properties() ->
application:set_env(rabbit, server_properties, ServerProperties),
passed.
+
+nop(_) -> ok.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index b9993823..548014be 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -90,7 +90,8 @@
sender :: pid(),
message :: message()}).
-type(message_properties() ::
- #message_properties{expiry :: pos_integer() | 'undefined'}).
+ #message_properties{expiry :: pos_integer() | 'undefined',
+ needs_confirming :: boolean()}).
%% this is really an abstract type, but dialyzer does not support them
-type(txn() :: rabbit_guid:guid()).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 27a94f6f..97a07514 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -126,19 +126,35 @@ heads(G) ->
%% -------------------------------------------------------------------
apply_upgrades(Upgrades) ->
- LockFile = lock_filename(),
- case file:open(LockFile, [write, exclusive]) of
- {ok, Lock} ->
- ok = file:close(Lock),
+ LockFile = lock_filename(dir()),
+ case rabbit_misc:lock_file(LockFile) of
+ ok ->
+ BackupDir = dir() ++ "-upgrade-backup",
info("Upgrades: ~w to apply~n", [length(Upgrades)]),
- [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
- info("Upgrades: All applied~n", []),
- ok = write_version(),
- ok = file:delete(LockFile);
+ case rabbit_mnesia:copy_db(BackupDir) of
+ ok ->
+ %% We need to make the backup after creating the
+ %% lock file so that it protects us from trying to
+ %% overwrite the backup. Unfortunately this means
+ %% the lock file exists in the backup too, which
+ %% is not intuitive. Remove it.
+ ok = file:delete(lock_filename(BackupDir)),
+ info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]),
+ [apply_upgrade(Upgrade) || Upgrade <- Upgrades],
+ info("Upgrades: All upgrades applied successfully~n", []),
+ ok = write_version(),
+ ok = rabbit_misc:recursive_delete([BackupDir]),
+ info("Upgrades: Mnesia backup removed~n", []),
+ ok = file:delete(LockFile);
+ {error, E} ->
+ %% If we can't backup, the upgrade hasn't started
+ %% hence we don't need the lockfile since the real
+ %% mnesia dir is the good one.
+ ok = file:delete(LockFile),
+ throw({could_not_back_up_mnesia_dir, E})
+ end;
{error, eexist} ->
- throw({error, previous_upgrade_failed});
- {error, _} = Error ->
- throw(Error)
+ throw({error, previous_upgrade_failed})
end.
apply_upgrade({M, F}) ->
@@ -151,7 +167,7 @@ dir() -> rabbit_mnesia:dir().
schema_filename() -> filename:join(dir(), ?VERSION_FILENAME).
-lock_filename() -> filename:join(dir(), ?LOCK_FILENAME).
+lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME).
%% NB: we cannot use rabbit_log here since it may not have been
%% started yet
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5ac042a2..0db51165 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -42,7 +42,7 @@
-export([start/1, stop/0]).
%% exported for testing only
--export([start_msg_store/2, stop_msg_store/0]).
+-export([start_msg_store/2, stop_msg_store/0, init/5]).
%%----------------------------------------------------------------------------
%% Definitions:
@@ -158,7 +158,7 @@
%% The conversion from alphas to betas is also chunked, but only to
%% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at
%% any one time. This further smooths the effects of changes to the
-%% target_ram_item_count and ensures the queue remains responsive
+%% target_ram_count and ensures the queue remains responsive
%% even when there is a large amount of IO work to do. The
%% idle_timeout callback is utilised to ensure that conversions are
%% done as promptly as possible whilst ensuring the queue remains
@@ -256,18 +256,21 @@
len,
persistent_count,
- target_ram_item_count,
+ target_ram_count,
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
ram_index_count,
out_counter,
in_counter,
+ rates,
+ msgs_on_disk,
+ msg_indices_on_disk,
+ unconfirmed,
ack_out_counter,
ack_in_counter,
- rates,
ack_rates
- }).
+ }).
-record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }).
@@ -348,15 +351,18 @@
persistent_count :: non_neg_integer(),
transient_threshold :: non_neg_integer(),
- target_ram_item_count :: non_neg_integer() | 'infinity',
+ target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
+ rates :: rates(),
+ msgs_on_disk :: gb_set(),
+ msg_indices_on_disk :: gb_set(),
+ unconfirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
- rates :: rates(),
ack_rates :: rates() }).
-include("rabbit_backing_queue_spec.hrl").
@@ -403,16 +409,23 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, false) ->
- IndexState = rabbit_queue_index:init(QueueName),
+init(QueueName, IsDurable, Recover) ->
+ Self = self(),
+ init(QueueName, IsDurable, Recover,
+ fun (Guids) -> msgs_written_to_disk(Self, Guids) end,
+ fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end).
+
+init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun),
init(IsDurable, IndexState, 0, [],
case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE);
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
+ MsgOnDiskFun);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE));
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined));
-init(QueueName, true, true) ->
+init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) ->
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, TRef, Terms1} =
case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of
@@ -422,16 +435,17 @@ init(QueueName, true, true) ->
_ -> {rabbit_guid:guid(), rabbit_guid:guid(), []}
end,
PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef),
+ PRef, MsgOnDiskFun),
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE,
- TRef),
+ TRef, undefined),
{DeltaCount, IndexState} =
rabbit_queue_index:recover(
QueueName, Terms1,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(Guid, PersistentClient)
- end),
+ end,
+ MsgIdxOnDiskFun),
init(true, IndexState, DeltaCount, Terms1,
PersistentClient, TransientClient).
@@ -507,25 +521,30 @@ publish(Msg, MsgProps, State) ->
publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
-publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps,
+publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = Guid },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
State = #vqstate { len = 0,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
persistent_count = PCount,
- durable = IsDurable }) ->
+ durable = IsDurable,
+ unconfirmed = Unconfirmed }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = true },
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
{SeqId, a(reduce_memory_use(
State2 #vqstate { next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
in_counter = InCount + 1,
- persistent_count = PCount1 }))}.
+ persistent_count = PCount1,
+ unconfirmed = Unconfirmed1 }))}.
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
@@ -635,9 +654,15 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- a(ack(fun msg_store_remove/3,
- fun (_AckEntry, State1) -> State1 end,
- AckTags, State)).
+ {Guids, State1} =
+ ack(fun msg_store_remove/3,
+ fun ({_IsPersistent, Guid, _MsgProps}, State1) ->
+ remove_confirms(gb_sets:singleton(Guid), State1);
+ (#msg_status{msg = #basic_message{guid = Guid}}, State1) ->
+ remove_confirms(gb_sets:singleton(Guid), State1)
+ end,
+ AckTags, State),
+ {Guids, a(State1)}.
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -687,7 +712,7 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- a(reduce_memory_use(
+ {_Guids, State1} =
ack(fun msg_store_release/3,
fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
{_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
@@ -702,32 +727,31 @@ requeue(AckTags, MsgPropsFun, State) ->
true, true, State2),
State3
end,
- AckTags, State))).
+ AckTags, State),
+ a(reduce_memory_use(State1)).
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-set_ram_duration_target(DurationTarget,
- State = #vqstate {
- rates =
- #rates { avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates =
- #rates { avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate },
- target_ram_item_count = TargetRamItemCount }) ->
+set_ram_duration_target(
+ DurationTarget, State = #vqstate {
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate },
+ target_ram_count = TargetRamCount }) ->
Rate =
AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
- TargetRamItemCount1 =
+ TargetRamCount1 =
case DurationTarget of
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_item_count = TargetRamItemCount1 },
- a(case TargetRamItemCount1 == infinity orelse
- (TargetRamItemCount =/= infinity andalso
- TargetRamItemCount1 >= TargetRamItemCount) of
+ State1 = State #vqstate { target_ram_count = TargetRamCount1 },
+ a(case TargetRamCount1 == infinity orelse
+ (TargetRamCount =/= infinity andalso
+ TargetRamCount1 >= TargetRamCount) of
true -> State1;
false -> reduce_memory_use(State1)
end).
@@ -803,40 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
-status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- pending_ack = PA,
- ram_ack_index = RAI,
- on_sync = #sync { funs = From },
- target_ram_item_count = TargetRamItemCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- next_seq_id = NextSeqId,
- persistent_count = PersistentCount,
- rates = #rates {
- avg_egress = AvgEgressRate,
- avg_ingress = AvgIngressRate },
- ack_rates = #rates {
- avg_egress = AvgAckEgressRate,
- avg_ingress = AvgAckIngressRate } }) ->
- [ {q1 , queue:len(Q1)},
- {q2 , bpqueue:len(Q2)},
- {delta , Delta},
- {q3 , bpqueue:len(Q3)},
- {q4 , queue:len(Q4)},
- {len , Len},
- {pending_acks , dict:size(PA)},
- {outstanding_txns , length(From)},
- {target_ram_item_count , TargetRamItemCount},
- {ram_msg_count , RamMsgCount},
- {ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
- {next_seq_id , NextSeqId},
- {persistent_count , PersistentCount},
- {avg_ingress_rate , AvgIngressRate},
- {avg_egress_rate , AvgEgressRate},
- {avg_ack_ingress_rate , AvgAckIngressRate},
- {avg_ack_egress_rate , AvgAckEgressRate} ].
+status(#vqstate {
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
+ pending_ack = PA,
+ ram_ack_index = RAI,
+ on_sync = #sync { funs = From },
+ target_ram_count = TargetRamCount,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ next_seq_id = NextSeqId,
+ persistent_count = PersistentCount,
+ rates = #rates { avg_egress = AvgEgressRate,
+ avg_ingress = AvgIngressRate },
+ ack_rates = #rates { avg_egress = AvgAckEgressRate,
+ avg_ingress = AvgAckIngressRate } }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {pending_acks , dict:size(PA)},
+ {outstanding_txns , length(From)},
+ {target_ram_count , TargetRamCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_ack_count , gb_trees:size(RAI)},
+ {ram_index_count , RamIndexCount},
+ {next_seq_id , NextSeqId},
+ {persistent_count , PersistentCount},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate, AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -882,6 +905,10 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
+gb_sets_maybe_insert(false, _Val, Set) -> Set;
+%% when requeueing, we re-add a guid to the unconfimred set
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid },
MsgProps) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
@@ -903,8 +930,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
end),
Res.
-msg_store_client_init(MsgStore) ->
- rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()).
+msg_store_client_init(MsgStore, MsgOnDiskFun) ->
+ rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun).
msg_store_write(MSCState, IsPersistent, Guid, Msg) ->
with_immutable_msg_store_state(
@@ -1026,34 +1053,37 @@ init(IsDurable, IndexState, DeltaCount, Terms,
end,
Now = now(),
State = #vqstate {
- q1 = queue:new(),
- q2 = bpqueue:new(),
- delta = Delta,
- q3 = bpqueue:new(),
- q4 = queue:new(),
- next_seq_id = NextSeqId,
- pending_ack = dict:new(),
- ram_ack_index = gb_trees:empty(),
- index_state = IndexState1,
- msg_store_clients = {PersistentClient, TransientClient},
- on_sync = ?BLANK_SYNC,
- durable = IsDurable,
- transient_threshold = NextSeqId,
-
- len = DeltaCount1,
- persistent_count = DeltaCount1,
-
- target_ram_item_count = infinity,
- ram_msg_count = 0,
- ram_msg_count_prev = 0,
- ram_ack_count_prev = 0,
- ram_index_count = 0,
- out_counter = 0,
- in_counter = 0,
- ack_out_counter = 0,
- ack_in_counter = 0,
- rates = blank_rate(Now, DeltaCount1),
- ack_rates = blank_rate(Now, 0) },
+ q1 = queue:new(),
+ q2 = bpqueue:new(),
+ delta = Delta,
+ q3 = bpqueue:new(),
+ q4 = queue:new(),
+ next_seq_id = NextSeqId,
+ pending_ack = dict:new(),
+ ram_ack_index = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ on_sync = ?BLANK_SYNC,
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+
+ target_ram_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_index_count = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = blank_rate(Now, DeltaCount1),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ ack_rates = blank_rate(Now, 0) },
a(maybe_deltas_to_betas(State)).
blank_rate(Timestamp, IngressLength) ->
@@ -1079,7 +1109,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
end.
remove_persistent_messages(Guids) ->
- PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE),
+ PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined),
ok = rabbit_msg_store:remove(Guids, PersistentClient),
rabbit_msg_store:client_delete_and_terminate(PersistentClient).
@@ -1130,6 +1160,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
+ {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1141,7 +1172,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, ack(Acks, State)}, Pubs),
+ end, {PAcks, NewState}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1195,15 +1226,17 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) ->
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-publish(Msg = #basic_message { is_persistent = IsPersistent },
- MsgProps, IsDelivered, MsgOnDisk,
+publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, MsgOnDisk,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
next_seq_id = SeqId,
len = Len,
in_counter = InCount,
persistent_count = PCount,
durable = IsDurable,
- ram_msg_count = RamMsgCount }) ->
+ ram_msg_count = RamMsgCount,
+ unconfirmed = Unconfirmed }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
@@ -1213,11 +1246,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
{SeqId, State2 #vqstate { next_seq_id = SeqId + 1,
len = Len + 1,
in_counter = InCount + 1,
persistent_count = PCount1,
- ram_msg_count = RamMsgCount + 1}}.
+ ram_msg_count = RamMsgCount + 1,
+ unconfirmed = Unconfirmed1 }}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, _MSCState) ->
@@ -1310,7 +1345,7 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- State;
+ {[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
{{SeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
@@ -1328,14 +1363,17 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
gb_trees:delete_any(SeqId, RAI)})}
end, {{[], orddict:new()}, State}, AckTags),
IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(fun (IsPersistent, Guids, ok) ->
- MsgStoreFun(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ AckdGuids = lists:concat(
+ orddict:fold(
+ fun (IsPersistent, Guids, Gs) ->
+ MsgStoreFun(MSCState, IsPersistent, Guids),
+ [Guids | Gs]
+ end, [], GuidsByStore)),
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- State1 #vqstate { index_state = IndexState1,
+ {AckdGuids, State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }.
+ ack_out_counter = AckOutCount + length(AckTags) }}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
@@ -1352,6 +1390,44 @@ find_persistent_count(LensByStore) ->
end.
%%----------------------------------------------------------------------------
+%% Internal plumbing for confirms (aka publisher acks)
+%%----------------------------------------------------------------------------
+
+remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
+ msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
+ unconfirmed = gb_sets:difference(UC, GuidSet) }.
+
+msgs_confirmed(GuidSet, State) ->
+ {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}.
+
+msgs_written_to_disk(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
+
+msg_indices_written_to_disk(QPid, GuidSet) ->
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid, fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1364,7 +1440,7 @@ find_persistent_count(LensByStore) ->
%% though the conversion function for that is called as necessary. The
%% reason is twofold. Firstly, this is safe because the conversion is
%% only ever necessary just after a transition to a
-%% target_ram_item_count of zero or after an incremental alpha->beta
+%% target_ram_count of zero or after an incremental alpha->beta
%% conversion. In the former case the conversion is performed straight
%% away (i.e. any betas present at the time are converted to deltas),
%% and in the latter case the need for a conversion is flagged up
@@ -1375,51 +1451,41 @@ find_persistent_count(LensByStore) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun,
- State = #vqstate {target_ram_item_count = infinity}) ->
+ State = #vqstate {target_ram_count = infinity}) ->
{false, State};
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun,
State = #vqstate {
- ram_ack_index = RamAckIndex,
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount,
- rates = #rates {
- avg_ingress = AvgIngress,
- avg_egress = AvgEgress },
- ack_rates = #rates {
- avg_ingress = AvgAckIngress,
- avg_egress = AvgAckEgress } }) ->
+ ram_ack_index = RamAckIndex,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount,
+ rates = #rates { avg_ingress = AvgIngress,
+ avg_egress = AvgEgress },
+ ack_rates = #rates { avg_ingress = AvgAckIngress,
+ avg_egress = AvgAckEgress }
+ }) ->
{Reduce, State1} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
- TargetRamItemCount) of
- 0 ->
- {false, State};
- S1 ->
- ReduceFuns =
- case (AvgAckIngress - AvgAckEgress) >
- (AvgIngress - AvgEgress) of
- true ->
- %% ACKs are growing faster than the queue,
- %% push messages from there first.
- [AckFun, AlphaBetaFun];
- false ->
- %% The queue is growing faster than the
- %% acks, push queue messages first.
- [AlphaBetaFun, AckFun]
- end,
- {_, State2} =
- %% Both reduce functions get a chance to reduce
- %% memory. The second may very well get a quota of
- %% 0 if the first function managed to push out the
- %% maximum number of messages.
- lists:foldl(
- fun (ReduceFun, {QuotaN, StateN}) ->
- ReduceFun(QuotaN, StateN)
- end, {S1, State}, ReduceFuns),
- {true, State2}
+ TargetRamCount) of
+ 0 -> {false, State};
+ %% Reduce memory of pending acks and alphas. The order is
+ %% determined based on which is growing faster. Whichever
+ %% comes second may very well get a quota of 0 if the
+ %% first manages to push out the max number of messages.
+ S1 -> {_, State2} =
+ lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end,
+ {S1, State},
+ case (AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress) of
+ true -> [AckFun, AlphaBetaFun];
+ false -> [AlphaBetaFun, AckFun]
+ end),
+ {true, State2}
end,
- case State1 #vqstate.target_ram_item_count of
+ case State1 #vqstate.target_ram_count of
0 -> {Reduce, BetaDeltaFun(State1)};
_ -> case chunk_size(State1 #vqstate.ram_index_count,
permitted_ram_index_count(State1)) of
@@ -1615,11 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_item_count = TargetRamItemCount })
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
- TargetRamItemCount =:= infinity orelse
- TargetRamItemCount >= RamMsgCount ->
+ TargetRamCount =:= infinity orelse
+ TargetRamCount >= RamMsgCount ->
{Quota, State};
maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case Generator(Q) of
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 50bca390..068ac186 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -34,12 +34,11 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
--export([send_command/2, send_command/3, send_command_sync/2,
- send_command_sync/3, send_command_and_notify/5]).
+-export([send_command/2, send_command/3,
+ send_command_sync/2, send_command_sync/3,
+ send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--import(gen_tcp).
-
-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -66,6 +65,9 @@
-spec(send_command_sync/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
+-spec(send_command_and_notify/4 ::
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -130,6 +132,10 @@ handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
ok = internal_send_command_async(MethodRecord, Content, State),
gen_server:reply(From, ok),
State;
+handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ rabbit_amqqueue:notify_sent(QPid, ChPid),
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
ok = internal_send_command_async(MethodRecord, Content, State),
@@ -158,6 +164,10 @@ send_command_sync(W, MethodRecord) ->
send_command_sync(W, MethodRecord, Content) ->
call(W, {send_command_sync, MethodRecord, Content}).
+send_command_and_notify(W, Q, ChPid, MethodRecord) ->
+ W ! {send_command_and_notify, Q, ChPid, MethodRecord},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
@@ -170,7 +180,7 @@ call(Pid, Msg) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord, Protocol) ->
+assemble_frame(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
@@ -185,17 +195,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
+%% We optimise delivery of small messages. Content-bearing methods
+%% require at least three frames. Small messages always fit into
+%% that. We hand their frames to the Erlang network functions in one
+%% go, which may lead to somewhat more efficient processing in the
+%% runtime and a greater chance of coalescing into fewer TCP packets.
+%%
+%% By contrast, for larger messages, split across many frames, we want
+%% to allow interleaving of frames on different channels. Hence we
+%% hand them to the Erlang network functions one frame at a time.
+send_frames(Fun, Sock, Frames) when length(Frames) =< 3 ->
+ Fun(Sock, Frames);
+send_frames(Fun, Sock, Frames) ->
+ lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame);
+ (_Frame, Other) -> Other
+ end, ok, Frames).
+
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
+ ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
Protocol) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ ok = send_frames(fun tcp_send/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -219,19 +246,19 @@ internal_send_command_async(MethodRecord,
#wstate{sock = Sock,
channel = Channel,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
- ok.
+ ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
internal_send_command_async(MethodRecord, Content,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)),
- ok.
+ ok = send_frames(fun port_cmd/2, Sock,
+ assemble_frames(Channel, MethodRecord,
+ Content, FrameMax, Protocol)).
port_cmd(Sock, Data) ->
- try rabbit_net:port_command(Sock, Data)
- catch error:Error -> exit({writer, send_failed, Error})
- end.
+ true = try rabbit_net:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end,
+ ok.