summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-26 10:44:36 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-26 10:44:36 +0000
commitab9ab903487244bcbcb50b982f1c44cfcfbe20f0 (patch)
tree6ab2313fa5a9ff9ddfb27b5b92bf0c56cee39e74
parentbf87506cea0640482af76fcb580229bed2e2c74e (diff)
downloadrabbitmq-server-bug25323.tar.gz
mark all messages enqueued in the slave as 'delivered'bug25323
which is a much better than the set_delivered logic, which we can now get rid of. In doing so it also becomes clear that having the 'delivered' flag in the #message_properties is less than ideal. It is mutable and we never bothered updating vq s.t. it sets the flag correctly. So lets get rid of it and add a parameter to bq:publish instead
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl39
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_variable_queue.erl30
7 files changed, 40 insertions, 57 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index b2832b45..0ccb80bf 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -78,8 +78,7 @@
-record(event, {type, props, timestamp}).
--record(message_properties, {expiry, needs_confirming = false,
- delivered = false}).
+-record(message_properties, {expiry, needs_confirming = false}).
-record(plugin, {name, %% atom()
version, %% string()
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5ddafba8..bfc0f418 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -540,8 +540,8 @@ run_message_queue(State) ->
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -563,15 +563,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -704,10 +704,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9e99ca5e..26c63b08 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -78,8 +78,8 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -219,7 +219,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {foreach_ack, 3}, {requeue, 2}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8fcd1893..c8a361b1 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4,
+ purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
@@ -38,7 +38,6 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
ack_msg_id,
@@ -55,7 +54,6 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
ack_msg_id :: dict(),
@@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
ack_msg_id = dict:new(),
@@ -136,8 +133,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -148,8 +145,7 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
@@ -175,17 +171,16 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
@@ -224,7 +219,6 @@ discard(MsgId, ChPid, State = #state { gm = GM,
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
- set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
@@ -234,9 +228,7 @@ dropwhile(Pred, AckRequired,
0 -> ok;
_ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -269,16 +261,14 @@ drain_confirmed(State = #state { backing_queue = BQ,
confirmed = [] }}.
fetch(AckRequired, State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {Message, IsDelivered, AckTag} ->
- {{Message, IsDelivered orelse SetDelivered > 0, AckTag},
- drop(Message#basic_message.id, AckTag, State1)}
+ {#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
+ {Result, drop(MsgId, AckTag, State1)}
end.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -416,7 +406,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -451,14 +440,12 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
- ack_msg_id = AM,
+drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
- State #state { set_delivered = lists:max([0, SetDelivered - 1]),
- ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+ State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cb7a2135..752dac89 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -703,7 +703,7 @@ process_instruction({publish, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 81180ebe..4a989424 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2230,7 +2230,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
false -> 1
end},
PayloadFun(N)),
- PropFun(N, #message_properties{}), self(), VQN)
+ PropFun(N, #message_properties{}), false, self(), VQN)
end, VQ, lists:seq(1, Count)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e2566e10..be340cdd 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,7 +17,7 @@
-module(rabbit_variable_queue).
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
- publish/4, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/3, fetch/2, drop/2, ack/2, requeue/2, fold/3, len/1,
is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
@@ -520,16 +520,16 @@ purge(State = #vqstate { q4 = Q4,
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- ram_msg_count = RamMsgCount,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = msg_status(IsPersistent1, SeqId, Msg, MsgProps),
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = case ?QUEUE:is_empty(Q3) of
false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
@@ -556,8 +556,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
durable = IsDurable,
unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
- MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
- #msg_status { is_delivered = true },
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
State2 = record_pending_ack(m(MsgStatus1), State1),
PCount1 = PCount + one_if(IsPersistent1),
@@ -891,11 +890,10 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set;
%% when requeueing, we re-add a msg_id to the unconfirmed set
gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
-msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
- MsgProps = #message_properties { delivered = Delivered }) ->
- %% TODO would it make sense to remove #msg_status.is_delivered?
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message { id = MsgId }, MsgProps) ->
#msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg,
- is_persistent = IsPersistent, is_delivered = Delivered,
+ is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.