summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <alvaro@rabbitmq.com>2013-12-18 13:19:40 +0100
committerAlvaro Videla <alvaro@rabbitmq.com>2013-12-18 13:19:40 +0100
commit5e9bb2c4177f1c0c2ad3ec8b25df7a397be175c7 (patch)
tree9a3295d794b8eebb75bc7b2a6f866bcdcf2befe8
parent3bfaffaf7e626a5fce0c8e2898b5495b64c497ff (diff)
parentcb12d6bc971bc3ac02a3635f001bb338d7825ec4 (diff)
downloadrabbitmq-server-5e9bb2c4177f1c0c2ad3ec8b25df7a397be175c7.tar.gz
merge default into bug25817
-rw-r--r--src/rabbit_channel.erl82
-rw-r--r--src/rabbit_channel_interceptor.erl77
-rw-r--r--src/rabbit_registry.erl15
3 files changed, 150 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4d778f94..ff6bbb11 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -272,7 +272,9 @@ handle_cast({method, Method, Content, Flow},
flow -> credit_flow:ack(Reader);
noflow -> ok
end,
- try handle_method(Method, Content, State) of
+ %% handle MRDQ before calling handle method
+ Method2 = handle_expand_shortcuts(Method, State),
+ try handle_method(Method2, Content, State) of
{reply, Reply, NewState} ->
ok = send(Reply, NewState),
noreply(NewState);
@@ -282,7 +284,7 @@ handle_cast({method, Method, Content, Flow},
{stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
- MethodName = rabbit_misc:method_record_type(Method),
+ MethodName = rabbit_misc:method_record_type(Method2),
handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
@@ -519,14 +521,24 @@ check_internal_exchange(#exchange{name = Name, internal = true}) ->
check_internal_exchange(_) ->
ok.
+binding_to_resource(queue, DestinationNameBin, State) ->
+ queue_bin_to_resource(DestinationNameBin, State);
+binding_to_resource(exchange, DestinationNameBin, State) ->
+ exchange_bin_to_resource(DestinationNameBin, State).
+
+queue_bin_to_resource(QueueNameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, queue, QueueNameBin).
+
+exchange_bin_to_resource(ExchangeNameBin, #ch{virtual_host = VHostPath}) ->
+ rabbit_misc:r(VHostPath, queue, ExchangeNameBin).
+
expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath,
- most_recently_declared_queue = MRDQ}) ->
- rabbit_misc:r(VHostPath, queue, MRDQ);
-expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) ->
- rabbit_misc:r(VHostPath, queue, QueueNameBin).
+expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+ MRDQ;
+expand_queue_name_shortcut(QueueNameBin, _) ->
+ QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
#ch{most_recently_declared_queue = <<>>}) ->
@@ -541,9 +553,46 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
expand_binding(queue, DestinationNameBin, RoutingKey, State) ->
{expand_queue_name_shortcut(DestinationNameBin, State),
expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)};
-expand_binding(exchange, DestinationNameBin, RoutingKey, State) ->
- {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin),
- RoutingKey}.
+expand_binding(exchange, DestinationNameBin, RoutingKey, _) ->
+ {DestinationNameBin, RoutingKey}.
+
+handle_expand_shortcuts(#'basic.get'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'basic.get'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'basic.consume'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'basic.consume'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'queue.delete'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'queue.delete'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'queue.purge'{queue = QueueNameBin} = Method, State) ->
+ setelement(#'queue.purge'.queue, Method,
+ expand_queue_name_shortcut(QueueNameBin, State));
+handle_expand_shortcuts(#'exchange.bind'{destination = DestinationNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(exchange, DestinationNameBin, RoutingKey, State),
+ Method#'exchange.bind'{destination = DestinationName,
+ routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'exchange.unbind'{destination = DestinationNameBin,
+ routing_key = RoutingKey} = Method, State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(exchange, DestinationNameBin, RoutingKey, State),
+ Method#'exchange.unbind'{destination = DestinationName,
+ routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'queue.bind'{queue = QueueNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(queue, QueueNameBin, RoutingKey, State),
+ Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey};
+handle_expand_shortcuts(#'queue.unbind'{queue = QueueNameBin,
+ routing_key = RoutingKey} = Method,
+ State) ->
+ {DestinationName, ActualRoutingKey} =
+ expand_binding(queue, QueueNameBin, RoutingKey, State),
+ Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}.
check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
rabbit_misc:protocol_error(
@@ -716,7 +765,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
conn_pid = ConnPid,
limiter = Limiter,
next_tag = DeliveryTag}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -754,7 +803,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
@@ -1067,7 +1116,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_empty = IfEmpty,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with(
QueueName,
@@ -1107,7 +1156,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid}) ->
- QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ QueueName = queue_bin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
@@ -1281,15 +1330,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid }) ->
- {DestinationName, ActualRoutingKey} =
- expand_binding(DestinationType, DestinationNameBin, RoutingKey, State),
+ DestinationName = binding_to_resource(DestinationType, DestinationNameBin, State),
check_write_permitted(DestinationName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
[check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
check_read_permitted(ExchangeName, State),
case Fun(#binding{source = ExchangeName,
destination = DestinationName,
- key = ActualRoutingKey,
+ key = RoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl
new file mode 100644
index 00000000..1b0c01f2
--- /dev/null
+++ b/src/rabbit_channel_interceptor.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+%% Since the AMQP methods used here are queue related,
+%% maybe we want this to be a queue_interceptor.
+
+-module(rabbit_channel_interceptor).
+
+-include("rabbit.hrl").
+
+-export([intercept_method/1]).
+
+-ifdef(use_specs).
+
+-type(intercept_method() :: rabbit_framing:amqp_method_name()).
+-type(original_method() :: rabbit_framing:amqp_method_record()).
+-type(processed_method() :: rabbit_framing:amqp_method_record()).
+
+-callback description() -> [proplists:property()].
+
+-callback intercept(original_method()) ->
+ rabbit_types:ok_or_error2(processed_method(), any()).
+
+%% Whether the interceptor wishes to intercept the amqp method
+-callback applies_to(intercept_method()) -> boolean().
+
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {intercept, 1}, {applies_to, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+intercept_method(M) ->
+ intercept_method(M, select(M)).
+
+intercept_method(M, []) ->
+ M;
+intercept_method(M, [I]) ->
+ case I:intercept(M) of
+ {ok, M2} ->
+ M2;
+ {error, Reason} ->
+ rabbit_misc:protocol_error(
+ internal_error, "~s",
+ [Reason])
+ end;
+intercept_method(M, _) ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "More than one interceptor defined for method: ~p",
+ [rabbit_misc:method_record_type(M)]).
+
+%% select the interceptors that apply to intercept_method().
+select(Method) ->
+ [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor),
+ code:which(M) =/= non_existing,
+ M:applies_to(Method)].
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 3014aeb7..abb71e7a 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) ->
true -> ok
end.
-class_module(exchange) -> rabbit_exchange_type;
-class_module(auth_mechanism) -> rabbit_auth_mechanism;
-class_module(runtime_parameter) -> rabbit_runtime_parameter;
-class_module(exchange_decorator) -> rabbit_exchange_decorator;
-class_module(queue_decorator) -> rabbit_queue_decorator;
-class_module(policy_validator) -> rabbit_policy_validator;
-class_module(ha_mode) -> rabbit_mirror_queue_mode.
+class_module(exchange) -> rabbit_exchange_type;
+class_module(auth_mechanism) -> rabbit_auth_mechanism;
+class_module(runtime_parameter) -> rabbit_runtime_parameter;
+class_module(exchange_decorator) -> rabbit_exchange_decorator;
+class_module(queue_decorator) -> rabbit_queue_decorator;
+class_module(policy_validator) -> rabbit_policy_validator;
+class_module(ha_mode) -> rabbit_mirror_queue_mode;
+class_module(channel_interceptor) -> rabbit_channel_interceptor.
%%---------------------------------------------------------------------------