diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 13:39:03 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-29 13:39:03 +0000 |
commit | 55eeddf70f52604f35b4ec98fdba5827d54631f9 (patch) | |
tree | 73caa4948a8218a07a97b264becd02ce59e231f2 | |
parent | 58d814681fdc33df35e161c9580c2c924fcffccf (diff) | |
download | rabbitmq-server-55eeddf70f52604f35b4ec98fdba5827d54631f9.tar.gz |
Get rid of credit_map, allow initial credit setting through an argument to basic.consume.
-rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 56a10676..dae21389 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -39,7 +39,7 @@ queue_names, queue_monitors, consumer_mapping, blocking, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, capabilities, trace_state, credit_map}). + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -219,8 +219,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, unconfirmed = dtree:empty(), confirmed = [], capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - credit_map = dict:new()}, + trace_state = rabbit_trace:init(VHost)}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -720,11 +719,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Args}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, - consumer_mapping = ConsumerMapping, - credit_map = CreditMap}) -> + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -742,15 +741,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> - case dict:find(ActualConsumerTag, CreditMap) of - {ok, {Credit, Count, Drain}} -> - ok = rabbit_amqqueue:inform_limiter( - Q, self(), - {basic_credit, ActualConsumerTag, - Credit, Count, Drain, false}); - error -> - ok - end, + maybe_set_initial_credit(Args, ActualConsumerTag, Q), {rabbit_amqqueue:basic_consume( Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, @@ -760,11 +751,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end) of {ok, Q = #amqqueue{pid = QPid, name = QName}} -> CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), - CrM1 = dict:erase(ActualConsumerTag, CreditMap), State1 = monitor_delivering_queue( NoAck, QPid, QName, - State#ch{consumer_mapping = CM1, - credit_map = CrM1}), + State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1131,16 +1120,13 @@ handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, count = Count, drain = Drain}, _, - State = #ch{consumer_mapping = Consumers, - credit_map = CMap}) -> + State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( Q, self(), {basic_credit, CTag, Credit, Count, Drain, true}), {noreply, State}; - error -> CMap2 = dict:store(CTag, {Credit, Count, Drain}, CMap), - {reply, #'basic.credit_ok'{available = 0}, - State#ch{credit_map = CMap2}} + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) end; handle_method(_MethodRecord, _Content, _State) -> @@ -1209,6 +1195,21 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +maybe_set_initial_credit(Arguments, CTag, Q) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> + ok = rabbit_amqqueue:inform_limiter( + Q, self(), + {basic_credit, CTag, Credit, 0, Drain, + false}); + _ -> + ok + end; + undefined -> ok + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, |