summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-19 21:32:01 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-19 21:32:01 +0000
commitf27c502034c9e5218e280c4a39da88562b466f51 (patch)
tree59b17d795623020fc51080882dd0331f4af0ea20
parent005788d47882dade23b7c3b605bcafde4107222d (diff)
downloadrabbitmq-server-bug25394.tar.gz
populate slave's msg_id_ack with sync'ed messages pending ackbug25394
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
-rw-r--r--src/rabbit_mirror_queue_sync.erl45
2 files changed, 30 insertions, 24 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b63fccc9..cd2a8042 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -227,9 +227,12 @@ handle_cast({sync_start, Ref, Syncer},
backing_queue = BQ,
backing_queue_state = BQS }) ->
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
- S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
- rate_timer_ref = TRefN,
- backing_queue_state = BQSN} end,
+ S = fun({MA, TRefN, BQSN}) ->
+ State1#state{depth_delta = undefined,
+ msg_id_ack = dict:from_list(MA),
+ rate_timer_ref = TRefN,
+ backing_queue_state = BQSN}
+ end,
case rabbit_mirror_queue_sync:slave(
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b023823e..b8cfe4a9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,6 +57,9 @@
-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
-type(bq() :: atom()).
-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
@@ -69,8 +72,8 @@
-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
'denied' |
- {'ok' | 'failed', {timer:tref(), bqs()}} |
- {'stop', any(), {timer:tref(), bqs()}}).
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
-endif.
@@ -206,10 +209,10 @@ slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
Syncer ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
- rabbit_misc:get_parent()}, TRef, BQS1).
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
- TRef, BQS) ->
+ State = {MA, TRef, BQS}) ->
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -218,40 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
credit_flow:peer_down(Syncer),
- {failed, {TRef, BQS1}};
+ {failed, {[], TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef, [flush]),
credit_flow:peer_down(Syncer),
- {ok, {TRef, BQS}};
+ {ok, State};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ slave_sync_loop(Args, {MA, TRef, BQS1});
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Args, TRef1, BQS1);
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
{sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = case Unacked of
- false -> BQ:publish(Msg, Props1, true, none, BQS);
- true -> {_AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, BQS),
- %% TODO do something w AckTag
- BQS2
- end,
- slave_sync_loop(Args, TRef, BQS1);
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
- {stop, Reason, {TRef, BQS}};
+ {stop, Reason, State};
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}}
end.