summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-27 12:24:43 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-27 12:24:43 +0100
commit641216a1c9710ac51ecd6d0b0a6bd28f041e8876 (patch)
treef09ce5586537dc2d7d6c7128e3143e15e3ca1e4d
parent9433b14ae3f0e864895b0b02837b7598cadfe32c (diff)
parenta1c74bfcded0120ab3557d27b96371efe6bf8de5 (diff)
downloadrabbitmq-server-641216a1c9710ac51ecd6d0b0a6bd28f041e8876.tar.gz
Merging default into bug24130
-rw-r--r--docs/rabbitmqctl.1.xml12
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl69
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl110
5 files changed, 218 insertions, 66 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index ee000215..74041969 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -860,6 +860,18 @@
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term>mirror_nodes</term>
+ <listitem><para>If the queue is mirrored, this provides the names of the nodes upon which mirrors will be present should those nodes be part of the current cluster (i.e. it may contain node names that are not currently part of the cluster).</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>synchronised_slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master.</para></listitem>
+ </varlistentry>
</variablelist>
<para>
If no <command>queueinfoitem</command>s are specified then queue name and depth are
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fcd6cc24..a8bb8312 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,8 +73,8 @@
messages,
consumers,
memory,
- backing_queue_status,
- slave_pids
+ slave_pids,
+ backing_queue_status
]).
-define(CREATION_EVENT_KEYS,
@@ -84,10 +84,13 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ mirror_nodes,
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -709,7 +712,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -741,14 +777,21 @@ i(consumers, State) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- SPids;
i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name),
- MNodes;
+ case MNodes of
+ undefined -> '';
+ _ -> MNodes
+ end;
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined -> '';
+ _ -> SPids
+ end;
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index f6664a27..0b9f053f 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/3, get_gm/1, ensure_monitoring/2]).
+-export([start_link/4, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -32,15 +32,17 @@
-record(state, { q,
gm,
monitors,
- death_fun
+ death_fun,
+ length_fun
}).
-define(ONE_SECOND, 1000).
-ifdef(use_specs).
--spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun()) ->
+-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:length_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -138,9 +140,28 @@
%% state of the master. The detection of the sync-status of a slave is
%% done entirely based on length: if the slave and the master both
%% agree on the length of the queue after the fetch of the head of the
-%% queue, then the queues must be in sync. The only other possibility
-%% is that the slave's queue is shorter, and thus the fetch should be
-%% ignored.
+%% queue (or a 'set_length' results in a slave having to drop some
+%% messages from the head of its queue), then the queues must be in
+%% sync. The only other possibility is that the slave's queue is
+%% shorter, and thus the fetch should be ignored. In case slaves are
+%% joined to an empty queue which only goes on to receive publishes,
+%% they start by asking the master to broadcast its length. This is
+%% enough for slaves to always be able to work out when their head
+%% does not differ from the master (and is much simpler and cheaper
+%% than getting the master to hang on to the guid of the msg at the
+%% head of its queue). When a slave is promoted to a master, it
+%% unilaterally broadcasts its length, in order to solve the problem
+%% of length requests from new slaves being unanswered by a dead
+%% master.
+%%
+%% Obviously, due to the async nature of communication across gm, the
+%% slaves can fall behind. This does not matter from a sync pov: if
+%% they fall behind and the master dies then a) no publishes are lost
+%% because all publishes go to all mirrors anyway; b) the worst that
+%% happens is that acks get lost and so messages come back to
+%% life. This is no worse than normal given you never get confirmation
+%% that an ack has been received (not quite true with QoS-prefetch,
+%% but close enough for jazz).
%%
%% Because acktags are issued by the bq independently, and because
%% there is no requirement for the master and all slaves to use the
@@ -279,8 +300,8 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+start_link(Queue, GM, DeathFun, LengthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q,
- gm = GM1,
- monitors = dict:new(),
- death_fun = DeathFun },
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun,
+ length_fun = LengthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -329,6 +351,10 @@ handle_cast({gm_deaths, Deaths},
{stop, normal, State}
end;
+handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
+ ok = LengthFun(),
+ noreply(State);
+
handle_cast({ensure_monitoring, Pids},
State = #state { monitors = Monitors }) ->
Monitors1 =
@@ -343,13 +369,12 @@ handle_cast({ensure_monitoring, Pids},
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
- death_fun = Fun }) ->
- noreply(
- case dict:is_key(Pid, Monitors) of
- false -> State;
- true -> ok = Fun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
- end);
+ death_fun = DeathFun }) ->
+ noreply(case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = DeathFun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -379,6 +404,8 @@ members_changed([CPid], _Births, Deaths) ->
handle_msg([_CPid], _From, heartbeat) ->
ok;
+handle_msg([CPid], _From, request_length = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([_CPid], _From, _Msg) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 532911f2..ad5fd28f 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -25,7 +25,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0]).
+-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -44,9 +44,10 @@
-ifdef(use_specs).
--export_type([death_fun/0]).
+-export_type([death_fun/0, length_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(length_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -61,6 +62,7 @@
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
+-spec(length_fun/0 :: () -> length_fun()).
-endif.
@@ -83,7 +85,7 @@ stop() ->
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun()),
+ Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
MNodes1 =
(case MNodes of
@@ -94,6 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -349,11 +352,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% ---------------------------------------------------------------------------
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ Len = BQ:len(BQS),
+ ok = gm:broadcast(GM, {length, Len}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
+ set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -371,9 +376,18 @@ sender_death_fun() ->
end)
end.
-%% ---------------------------------------------------------------------------
-%% Helpers
-%% ---------------------------------------------------------------------------
+length_fun() ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ State
+ end)
+ end.
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b38a8967..d32977ae 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -33,7 +33,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2]).
+-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,15 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ master_pid,
+ is_synchronised
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
+
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -64,7 +73,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -73,6 +84,9 @@ start_link(Q) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+info(QPid) ->
+ gen_server2:call(QPid, info, infinity).
+
init([#amqqueue { name = QueueName } = Q]) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -96,26 +110,32 @@ init([#amqqueue { name = QueueName } = Q]) ->
end),
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
BQS = bq_init(BQ, Q, false),
- {ok, #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new()
- }, hibernate,
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
+ },
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
@@ -167,7 +187,10 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State}
- end.
+ end;
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -291,6 +314,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, request_length) ->
+ %% This is only of value to the master
+ ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
@@ -315,6 +341,14 @@ inform_deaths(SPid, Deaths) ->
%% Others
%% ---------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _State) -> self();
+i(name, #state { q = #amqqueue { name = Name } }) -> Name;
+i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(Item, _State) -> throw({bad_argument, Item}).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
@@ -380,7 +414,7 @@ gb_trees_cons(Key, Value, Tree) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
+promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -388,13 +422,16 @@ promote_me(From, #state { q = Q,
sender_queues = SQ,
msg_id_ack = MA,
msg_id_status = MS,
- known_senders = KS }) ->
+ known_senders = KS,
+ master_pid = MPid}) ->
+ rabbit_event:notify(queue_slave_promoted, [{pid, self()},
+ {old_pid, MPid}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(Q #amqqueue.name),
- rabbit_misc:pid_to_string(self())]),
+ [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -748,7 +785,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -756,7 +793,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -769,6 +807,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -821,6 +861,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -848,3 +892,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { master_pid = MasterPid,
+ synchronised = false }) ->
+ rabbit_event:notify(queue_slave_synchronised, [{master_pid, MasterPid},
+ {pid, self()}]),
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.