diff options
author | Emile Joubert <emile@rabbitmq.com> | 2013-03-15 17:38:37 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2013-03-15 17:38:37 +0000 |
commit | 1e4232307730f751b0bb4bc0adbf125a7ea35af1 (patch) | |
tree | ae592ebba86d40367f5511cdd5918c9d74ab67f8 | |
parent | 41f7641bd9d81ee6f45340dd6b5f6549e42e705d (diff) | |
download | rabbitmq-server-bug25287.tar.gz |
First draft of last value cachebug25287
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 42 | ||||
-rw-r--r-- | src/rabbit_exchange_decorator_lvc.erl | 111 |
2 files changed, 147 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 18b641d4..b09a34ec 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -813,7 +813,7 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, X, RK, SeqNo, QName) of [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; QPids -> {AckImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), + dtree:insert(SeqNo, QPids, {ack, AckTag}, UC), pmon:monitor_all(QPids, QMons)} end end, {[], SeqNo0, UC0, QMons0}, BQS), @@ -861,11 +861,17 @@ stop(From, Reply, State = #q{unconfirmed = UC}) -> {false, _} -> noreply(State#q{delayed_stop = {From, Reply}}) end. -cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, +cleanup_after_confirm(Actions, State = #q{delayed_stop = DS, unconfirmed = UC, backing_queue = BQ, backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + Acks = lists:foldl(fun ({reply, Reply, From}, Acc) -> + gen_server2:reply(From, Reply), + Acc; + ({ack, Ack}, Acc) -> + [Ack | Acc] + end, [], Actions), + {_Guids, BQS1} = BQ:ack(Acks, BQS), State1 = State#q{backing_queue_state = BQS1}, case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of @@ -1239,16 +1245,40 @@ handle_call(force_event_refresh, _From, {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), emit_consumer_created(Ch, CTag, true, AckRequired, QName) end, - reply(ok, State). + reply(ok, State); + +handle_call({copy, DestQName}, From, State = #q{backing_queue = BQ, + publish_seqno = SeqNo0, + queue_monitors = QMons0, + unconfirmed = UC0, + backing_queue_state = BQS0}) -> + {ok, #amqqueue{pid = DestQPid} = DestQ} = rabbit_amqqueue:lookup(DestQName), + {SeqNo1, BQS1} = + BQ:fold(fun (Msg, _Props, _Unacked, SeqNo) -> + Delivery = rabbit_basic:delivery(false, Msg, SeqNo), + rabbit_amqqueue:deliver([DestQ], Delivery), + {cont, SeqNo + 1} + end, SeqNo0, BQS0), + case SeqNo1 - SeqNo0 of + 0 -> + reply(0, State #q{backing_queue_state = BQS1}); + Count -> + UC1 = dtree:insert(SeqNo1-1, [DestQPid], {reply, Count, From}, UC0), + QMons1 = pmon:monitor(DestQ#amqqueue.pid, QMons0), + noreply(State #q{backing_queue_state = BQS1, + publish_seqno = SeqNo1, + unconfirmed = UC1, + queue_monitors = QMons1}) + end. handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> - {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + {MsgSeqNoActions, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of false -> QMons = State#q.queue_monitors, State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, - cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + cleanup_after_confirm([Action || {_MsgSeqNo, Action} <- MsgSeqNoActions], State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> diff --git a/src/rabbit_exchange_decorator_lvc.erl b/src/rabbit_exchange_decorator_lvc.erl new file mode 100644 index 00000000..b5ce1954 --- /dev/null +++ b/src/rabbit_exchange_decorator_lvc.erl @@ -0,0 +1,111 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_decorator_lvc). +-include("rabbit.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "LVC exchange decorator"}, + {mfa, {rabbit_registry, register, + [exchange_decorator, <<"lvc">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [exchange_decorator_route, <<"lvc">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"lvc">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, recovery}]}). + +-behaviour(rabbit_exchange_decorator). +-behaviour(rabbit_policy_validator). + +-export([description/0, serialise_events/1]). +-export([create/2, delete/3, add_binding/3, remove_bindings/3, + policy_changed/3, route/2]). +-export([validate_policy/1]). + +%%---------------------------------------------------------------------------- + +description() -> + [{description, <<"LVC exchange decorator">>}]. + +serialise_events(_) -> false. + +create(_Tx, _X) -> ok. + +delete(_Tx, _X, _Bs) -> ok. + +add_binding(none, X, #binding{key = Key, + destination = #resource{kind = queue} = Queue}) -> + case policy(X) of + false -> + ok; + _ -> + rabbit_amqqueue:with( + queue_name(X, Key), + fun (#amqqueue{pid = QPid}) -> + gen_server2:call(QPid, {copy, Queue}) + end), + ok + end; +add_binding(_Tx, _X, _B) -> + ok. + +remove_bindings(transaction, _X, _Bs) -> + ok; +remove_bindings(none, X = #exchange{name = _XName}, _Bs) -> + case policy(X) of + false -> ok; + _Max -> ok % TODO: no key in binding? + end. + +route(X, #delivery{message = #basic_message{routing_keys = RKs}}) -> + case policy(X) of + false -> + []; + Max -> + [begin + rabbit_amqqueue:declare(queue_name(X, RK), false, false, + [{<<"x-max-length">>, long, Max}], + none), + queue_name(X, RK) + end || RK <- RKs] + end. + +policy_changed(_Tx, _OldX, _NewX) -> ok. + +%%---------------------------------------------------------------------------- + +validate_policy([{<<"lvc">>, MaxCache}]) when is_integer(MaxCache) -> + ok; +validate_policy(Invalid) -> + {error, "~p invalid LVC policy", [Invalid]}. + +%%---------------------------------------------------------------------------- + +queue_name(#exchange{name = #resource{virtual_host = VHostPath, + kind = exchange, + name = Name}}, Key) -> + rabbit_misc:r( + VHostPath, queue, + list_to_binary([Name, ".", Key, ".", + base64:encode(erlang:md5(term_to_binary({Name, Key})))])). + + +policy(#exchange{} = X) -> + case rabbit_policy:get(<<"lvc">>, X) of + {ok, Max} -> Max; + {error, not_found} -> false + end. |