diff options
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 532911f2..ad5fd28f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0]). +-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). -behaviour(rabbit_backing_queue). @@ -44,9 +44,10 @@ -ifdef(use_specs). --export_type([death_fun/0]). +-export_type([death_fun/0, length_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). +-type(length_fun() :: fun (() -> 'ok')). -type(master_state() :: #state { gm :: pid(), coordinator :: pid(), backing_queue :: atom(), @@ -61,6 +62,7 @@ -spec(promote_backing_queue_state/6 :: (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). +-spec(length_fun/0 :: () -> length_fun()). -endif. @@ -83,7 +85,7 @@ stop() -> init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, AsyncCallback) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun()), + Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), MNodes1 = (case MNodes of @@ -94,6 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -349,11 +352,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% --------------------------------------------------------------------------- promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> + Len = BQ:len(BQS), + ok = gm:broadcast(GM, {length, Len}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), + set_delivered = Len, seen_status = SeenStatus, confirmed = [], ack_msg_id = dict:new(), @@ -371,9 +376,18 @@ sender_death_fun() -> end) end. -%% --------------------------------------------------------------------------- -%% Helpers -%% --------------------------------------------------------------------------- +length_fun() -> + Self = self(), + fun () -> + rabbit_amqqueue:run_backing_queue( + Self, ?MODULE, + fun (?MODULE, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + State + end) + end. maybe_store_acktag(undefined, _MsgId, AM) -> AM; |