summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-29 13:39:03 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-29 13:39:03 +0000
commit55eeddf70f52604f35b4ec98fdba5827d54631f9 (patch)
tree73caa4948a8218a07a97b264becd02ce59e231f2
parent58d814681fdc33df35e161c9580c2c924fcffccf (diff)
downloadrabbitmq-server-55eeddf70f52604f35b4ec98fdba5827d54631f9.tar.gz
Get rid of credit_map, allow initial credit setting through an argument to basic.consume.
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 24 insertions, 23 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 56a10676..dae21389 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,7 +39,7 @@
queue_names, queue_monitors, consumer_mapping,
blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, capabilities, trace_state, credit_map}).
+ unconfirmed, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -219,8 +219,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- credit_map = dict:new()},
+ trace_state = rabbit_trace:init(VHost)},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -720,11 +719,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
- nowait = NoWait},
+ nowait = NoWait,
+ arguments = Args},
_, State = #ch{conn_pid = ConnPid,
limiter = Limiter,
- consumer_mapping = ConsumerMapping,
- credit_map = CreditMap}) ->
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -742,15 +741,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
- case dict:find(ActualConsumerTag, CreditMap) of
- {ok, {Credit, Count, Drain}} ->
- ok = rabbit_amqqueue:inform_limiter(
- Q, self(),
- {basic_credit, ActualConsumerTag,
- Credit, Count, Drain, false});
- error ->
- ok
- end,
+ maybe_set_initial_credit(Args, ActualConsumerTag, Q),
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
@@ -760,11 +751,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end) of
{ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
- CrM1 = dict:erase(ActualConsumerTag, CreditMap),
State1 = monitor_delivering_queue(
NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1,
- credit_map = CrM1}),
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -1131,16 +1120,13 @@ handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
count = Count,
drain = Drain}, _,
- State = #ch{consumer_mapping = Consumers,
- credit_map = CMap}) ->
+ State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
Q, self(),
{basic_credit, CTag, Credit, Count, Drain, true}),
{noreply, State};
- error -> CMap2 = dict:store(CTag, {Credit, Count, Drain}, CMap),
- {reply, #'basic.credit_ok'{available = 0},
- State#ch{credit_map = CMap2}}
+ error -> precondition_failed("unknown consumer tag '~s'", [CTag])
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1209,6 +1195,21 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+maybe_set_initial_credit(Arguments, CTag, Q) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {boolean, Drain}} ->
+ ok = rabbit_amqqueue:inform_limiter(
+ Q, self(),
+ {basic_credit, CTag, Credit, 0, Drain,
+ false});
+ _ ->
+ ok
+ end;
+ undefined -> ok
+ end.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,