summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl310
1 files changed, 206 insertions, 104 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5fd9a512..b2716ec4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,23 +33,29 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/4, do/2, do/3, shutdown/1]).
+-behaviour(gen_server2).
+
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-%% callbacks
--export([init/2, handle_message/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
+-define(HIBERNATE_AFTER, 1000).
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -61,112 +67,126 @@
%%----------------------------------------------------------------------------
-start_link(ReaderPid, WriterPid, Username, VHost) ->
- buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid,
- Username, VHost]).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+ {ok, Pid} = gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost], []),
+ Pid.
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- Pid ! {method, Method, Content},
- ok.
+ gen_server2:cast(Pid, {method, Method, Content}).
shutdown(Pid) ->
- Pid ! terminate,
- ok.
+ gen_server2:cast(Pid, terminate).
send_command(Pid, Msg) ->
- Pid ! {command, Msg},
- ok.
+ gen_server2:cast(Pid, {command, Msg}).
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- Pid ! {deliver, ConsumerTag, AckRequired, Msg},
- ok.
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
- ok.
+ gen_server2:cast(Pid, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
-init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
- %% this is bypassing the proxy so alarms can "jump the queue" and
- %% be handled promptly
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- #ch{state = starting,
- proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}.
-
-handle_message({method, Method, Content}, State) ->
+ {ok, #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new()}}.
+
+handle_call(_Request, _From, State) ->
+ noreply(State).
+
+handle_cast({method, Method, Content}, State) ->
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
+ noreply(NewState);
{noreply, NewState} ->
- NewState;
+ noreply(NewState);
stop ->
- exit(normal)
+ {stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- terminate({amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State);
+ ok = notify_queues(internal_rollback(State)),
+ Reason = {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)},
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State#ch{state = terminating}};
exit:normal ->
- terminate(normal, State);
+ {stop, normal, State};
_:Reason ->
- terminate({Reason, erlang:get_stacktrace()}, State)
+ {stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_message(terminate, State) ->
- terminate(normal, State);
+handle_cast(terminate, State) ->
+ {stop, normal, State};
-handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- State;
+ noreply(State);
-handle_message({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
- ok = internal_deliver(WriterPid, ProxyPid,
- true, ConsumerTag, DeliveryTag, Msg),
- State1#ch{next_tag = DeliveryTag + 1};
+ ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_message({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
+ ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- State;
+ noreply(State).
-handle_message({'EXIT', _Pid, Reason}, State) ->
- terminate(Reason, State);
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
-handle_message(Other, State) ->
- terminate({unexpected_channel_message, Other}, State).
+handle_info(timeout, State) ->
+ ok = clear_permission_cache(),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
-%%---------------------------------------------------------------------------
+terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ state = terminating}) ->
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid);
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
rabbit_writer:shutdown(WriterPid),
- exit(Reason).
+ rabbit_limiter:shutdown(LimiterPid).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) ->
{reply, Reply, NewState}
end.
+check_resource_access(Username, Resource, Perm) ->
+ V = {Resource, Perm},
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ CacheTail =
+ case lists:member(V, Cache) of
+ true -> lists:delete(V, Cache);
+ false -> ok = rabbit_access_control:check_resource_access(
+ Username, Resource, Perm),
+ lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
+ end,
+ put(permission_cache, [V | CacheTail]),
+ ok.
+
+clear_permission_cache() ->
+ erase(permission_cache),
+ ok.
+
+check_configure_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.configure).
+
+check_write_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.write).
+
+check_read_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.read).
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- ok = rabbit_writer:shutdown(WriterPid),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
@@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
+ Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
+ _, State = #ch{ reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binstring_guid("amq.ctag");
@@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping }) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% cancel_ok ourselves it might overtake a
%% message sent previously by the queue.
rabbit_amqqueue:basic_cancel(
- Q, ProxyPid, ConsumerTag,
+ Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
consumer_tag = ConsumerTag}))
end) of
@@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
- {reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter_pid = LimiterPid }) ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self()),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true},
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), ProxyPid)
+ QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
lists:foreach(
@@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false},
%%
%% FIXME: should we allocate a fresh DeliveryTag?
ok = internal_deliver(
- WriterPid, ProxyPid,
- false, ConsumerTag, DeliveryTag,
+ WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
%% No answer required, apparently!
@@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = rabbit_exchange:lookup_or_die(ExchangeName),
ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch { virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
Finish(rabbit_amqqueue:declare(QueueName,
Durable, AutoDelete, Args));
- Other -> Other
+ Other = #amqqueue{name = QueueName} ->
+ check_configure_permitted(QueueName, State),
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
@@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ check_configure_permitted(QueueName, State),
Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
@@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
@@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
@@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_write_permitted(QueueName, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
@@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
State#ch{tx_participants = sets:union(Participants,
sets:from_list(MoreP))}.
-ack(ProxyPid, TxnKey, UAQ) ->
+ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid),
+ ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
[QPid | L]
end, [], UAQ).
@@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- ProxyPid).
+notify_queues(#ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+
+limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(undefined, _Acked) ->
+ ok;
+notify_limiter(LimiterPid, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
@@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n",
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
[Other]),
false
end.
@@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
+internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
routing_key = RoutingKey},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, ChPid, M, Content);
+ WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.