summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-03-15 17:38:37 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-03-15 17:38:37 +0000
commit1e4232307730f751b0bb4bc0adbf125a7ea35af1 (patch)
treeae592ebba86d40367f5511cdd5918c9d74ab67f8
parent41f7641bd9d81ee6f45340dd6b5f6549e42e705d (diff)
downloadrabbitmq-server-bug25287.tar.gz
First draft of last value cachebug25287
-rw-r--r--src/rabbit_amqqueue_process.erl42
-rw-r--r--src/rabbit_exchange_decorator_lvc.erl111
2 files changed, 147 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 18b641d4..b09a34ec 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -813,7 +813,7 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
X, RK, SeqNo, QName) of
[] -> {[AckTag | AckImm], SeqNo, UC, QMons};
QPids -> {AckImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
+ dtree:insert(SeqNo, QPids, {ack, AckTag}, UC),
pmon:monitor_all(QPids, QMons)}
end
end, {[], SeqNo0, UC0, QMons0}, BQS),
@@ -861,11 +861,17 @@ stop(From, Reply, State = #q{unconfirmed = UC}) ->
{false, _} -> noreply(State#q{delayed_stop = {From, Reply}})
end.
-cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
+cleanup_after_confirm(Actions, State = #q{delayed_stop = DS,
unconfirmed = UC,
backing_queue = BQ,
backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ Acks = lists:foldl(fun ({reply, Reply, From}, Acc) ->
+ gen_server2:reply(From, Reply),
+ Acc;
+ ({ack, Ack}, Acc) ->
+ [Ack | Acc]
+ end, [], Actions),
+ {_Guids, BQS1} = BQ:ack(Acks, BQS),
State1 = State#q{backing_queue_state = BQS1},
case dtree:is_empty(UC) andalso DS =/= undefined of
true -> case DS of
@@ -1239,16 +1245,40 @@ handle_call(force_event_refresh, _From,
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
emit_consumer_created(Ch, CTag, true, AckRequired, QName)
end,
- reply(ok, State).
+ reply(ok, State);
+
+handle_call({copy, DestQName}, From, State = #q{backing_queue = BQ,
+ publish_seqno = SeqNo0,
+ queue_monitors = QMons0,
+ unconfirmed = UC0,
+ backing_queue_state = BQS0}) ->
+ {ok, #amqqueue{pid = DestQPid} = DestQ} = rabbit_amqqueue:lookup(DestQName),
+ {SeqNo1, BQS1} =
+ BQ:fold(fun (Msg, _Props, _Unacked, SeqNo) ->
+ Delivery = rabbit_basic:delivery(false, Msg, SeqNo),
+ rabbit_amqqueue:deliver([DestQ], Delivery),
+ {cont, SeqNo + 1}
+ end, SeqNo0, BQS0),
+ case SeqNo1 - SeqNo0 of
+ 0 ->
+ reply(0, State #q{backing_queue_state = BQS1});
+ Count ->
+ UC1 = dtree:insert(SeqNo1-1, [DestQPid], {reply, Count, From}, UC0),
+ QMons1 = pmon:monitor(DestQ#amqqueue.pid, QMons0),
+ noreply(State #q{backing_queue_state = BQS1,
+ publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1})
+ end.
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
- {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ {MsgSeqNoActions, UC1} = dtree:take(MsgSeqNos, QPid, UC),
State1 = case dtree:is_defined(QPid, UC1) of
false -> QMons = State#q.queue_monitors,
State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
true -> State
end,
- cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ cleanup_after_confirm([Action || {_MsgSeqNo, Action} <- MsgSeqNoActions],
State1#q{unconfirmed = UC1});
handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
diff --git a/src/rabbit_exchange_decorator_lvc.erl b/src/rabbit_exchange_decorator_lvc.erl
new file mode 100644
index 00000000..b5ce1954
--- /dev/null
+++ b/src/rabbit_exchange_decorator_lvc.erl
@@ -0,0 +1,111 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_decorator_lvc).
+-include("rabbit.hrl").
+
+-rabbit_boot_step({?MODULE,
+ [{description, "LVC exchange decorator"},
+ {mfa, {rabbit_registry, register,
+ [exchange_decorator, <<"lvc">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [exchange_decorator_route, <<"lvc">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"lvc">>, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
+
+-behaviour(rabbit_exchange_decorator).
+-behaviour(rabbit_policy_validator).
+
+-export([description/0, serialise_events/1]).
+-export([create/2, delete/3, add_binding/3, remove_bindings/3,
+ policy_changed/3, route/2]).
+-export([validate_policy/1]).
+
+%%----------------------------------------------------------------------------
+
+description() ->
+ [{description, <<"LVC exchange decorator">>}].
+
+serialise_events(_) -> false.
+
+create(_Tx, _X) -> ok.
+
+delete(_Tx, _X, _Bs) -> ok.
+
+add_binding(none, X, #binding{key = Key,
+ destination = #resource{kind = queue} = Queue}) ->
+ case policy(X) of
+ false ->
+ ok;
+ _ ->
+ rabbit_amqqueue:with(
+ queue_name(X, Key),
+ fun (#amqqueue{pid = QPid}) ->
+ gen_server2:call(QPid, {copy, Queue})
+ end),
+ ok
+ end;
+add_binding(_Tx, _X, _B) ->
+ ok.
+
+remove_bindings(transaction, _X, _Bs) ->
+ ok;
+remove_bindings(none, X = #exchange{name = _XName}, _Bs) ->
+ case policy(X) of
+ false -> ok;
+ _Max -> ok % TODO: no key in binding?
+ end.
+
+route(X, #delivery{message = #basic_message{routing_keys = RKs}}) ->
+ case policy(X) of
+ false ->
+ [];
+ Max ->
+ [begin
+ rabbit_amqqueue:declare(queue_name(X, RK), false, false,
+ [{<<"x-max-length">>, long, Max}],
+ none),
+ queue_name(X, RK)
+ end || RK <- RKs]
+ end.
+
+policy_changed(_Tx, _OldX, _NewX) -> ok.
+
+%%----------------------------------------------------------------------------
+
+validate_policy([{<<"lvc">>, MaxCache}]) when is_integer(MaxCache) ->
+ ok;
+validate_policy(Invalid) ->
+ {error, "~p invalid LVC policy", [Invalid]}.
+
+%%----------------------------------------------------------------------------
+
+queue_name(#exchange{name = #resource{virtual_host = VHostPath,
+ kind = exchange,
+ name = Name}}, Key) ->
+ rabbit_misc:r(
+ VHostPath, queue,
+ list_to_binary([Name, ".", Key, ".",
+ base64:encode(erlang:md5(term_to_binary({Name, Key})))])).
+
+
+policy(#exchange{} = X) ->
+ case rabbit_policy:get(<<"lvc">>, X) of
+ {ok, Max} -> Max;
+ {error, not_found} -> false
+ end.