diff options
author | Alvaro Videla <alvaro@rabbitmq.com> | 2013-12-18 13:19:40 +0100 |
---|---|---|
committer | Alvaro Videla <alvaro@rabbitmq.com> | 2013-12-18 13:19:40 +0100 |
commit | 5e9bb2c4177f1c0c2ad3ec8b25df7a397be175c7 (patch) | |
tree | 9a3295d794b8eebb75bc7b2a6f866bcdcf2befe8 | |
parent | 3bfaffaf7e626a5fce0c8e2898b5495b64c497ff (diff) | |
parent | cb12d6bc971bc3ac02a3635f001bb338d7825ec4 (diff) | |
download | rabbitmq-server-5e9bb2c4177f1c0c2ad3ec8b25df7a397be175c7.tar.gz |
merge default into bug25817
-rw-r--r-- | src/rabbit_channel.erl | 82 | ||||
-rw-r--r-- | src/rabbit_channel_interceptor.erl | 77 | ||||
-rw-r--r-- | src/rabbit_registry.erl | 15 |
3 files changed, 150 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d778f94..ff6bbb11 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -272,7 +272,9 @@ handle_cast({method, Method, Content, Flow}, flow -> credit_flow:ack(Reader); noflow -> ok end, - try handle_method(Method, Content, State) of + %% handle MRDQ before calling handle method + Method2 = handle_expand_shortcuts(Method, State), + try handle_method(Method2, Content, State) of {reply, Reply, NewState} -> ok = send(Reply, NewState), noreply(NewState); @@ -282,7 +284,7 @@ handle_cast({method, Method, Content, Flow}, {stop, normal, State} catch exit:Reason = #amqp_error{} -> - MethodName = rabbit_misc:method_record_type(Method), + MethodName = rabbit_misc:method_record_type(Method2), handle_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} @@ -519,14 +521,24 @@ check_internal_exchange(#exchange{name = Name, internal = true}) -> check_internal_exchange(_) -> ok. +binding_to_resource(queue, DestinationNameBin, State) -> + queue_bin_to_resource(DestinationNameBin, State); +binding_to_resource(exchange, DestinationNameBin, State) -> + exchange_bin_to_resource(DestinationNameBin, State). + +queue_bin_to_resource(QueueNameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, queue, QueueNameBin). + +exchange_bin_to_resource(ExchangeNameBin, #ch{virtual_host = VHostPath}) -> + rabbit_misc:r(VHostPath, queue, ExchangeNameBin). + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, - most_recently_declared_queue = MRDQ}) -> - rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> - rabbit_misc:r(VHostPath, queue, QueueNameBin). +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> + MRDQ; +expand_queue_name_shortcut(QueueNameBin, _) -> + QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, #ch{most_recently_declared_queue = <<>>}) -> @@ -541,9 +553,46 @@ expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> expand_binding(queue, DestinationNameBin, RoutingKey, State) -> {expand_queue_name_shortcut(DestinationNameBin, State), expand_routing_key_shortcut(DestinationNameBin, RoutingKey, State)}; -expand_binding(exchange, DestinationNameBin, RoutingKey, State) -> - {rabbit_misc:r(State#ch.virtual_host, exchange, DestinationNameBin), - RoutingKey}. +expand_binding(exchange, DestinationNameBin, RoutingKey, _) -> + {DestinationNameBin, RoutingKey}. + +handle_expand_shortcuts(#'basic.get'{queue = QueueNameBin} = Method, State) -> + setelement(#'basic.get'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'basic.consume'{queue = QueueNameBin} = Method, State) -> + setelement(#'basic.consume'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'queue.delete'{queue = QueueNameBin} = Method, State) -> + setelement(#'queue.delete'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'queue.purge'{queue = QueueNameBin} = Method, State) -> + setelement(#'queue.purge'.queue, Method, + expand_queue_name_shortcut(QueueNameBin, State)); +handle_expand_shortcuts(#'exchange.bind'{destination = DestinationNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(exchange, DestinationNameBin, RoutingKey, State), + Method#'exchange.bind'{destination = DestinationName, + routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'exchange.unbind'{destination = DestinationNameBin, + routing_key = RoutingKey} = Method, State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(exchange, DestinationNameBin, RoutingKey, State), + Method#'exchange.unbind'{destination = DestinationName, + routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'queue.bind'{queue = QueueNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(queue, QueueNameBin, RoutingKey, State), + Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}; +handle_expand_shortcuts(#'queue.unbind'{queue = QueueNameBin, + routing_key = RoutingKey} = Method, + State) -> + {DestinationName, ActualRoutingKey} = + expand_binding(queue, QueueNameBin, RoutingKey, State), + Method#'queue.bind'{queue = DestinationName, routing_key = ActualRoutingKey}. check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) -> rabbit_misc:protocol_error( @@ -716,7 +765,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, conn_pid = ConnPid, limiter = Limiter, next_tag = DeliveryTag}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -754,7 +803,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of @@ -1067,7 +1116,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_empty = IfEmpty, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with( QueueName, @@ -1107,7 +1156,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{conn_pid = ConnPid}) -> - QueueName = expand_queue_name_shortcut(QueueNameBin, State), + QueueName = queue_bin_to_resource(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, @@ -1281,15 +1330,14 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid }) -> - {DestinationName, ActualRoutingKey} = - expand_binding(DestinationType, DestinationNameBin, RoutingKey, State), + DestinationName = binding_to_resource(DestinationType, DestinationNameBin, State), check_write_permitted(DestinationName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]], check_read_permitted(ExchangeName, State), case Fun(#binding{source = ExchangeName, destination = DestinationName, - key = ActualRoutingKey, + key = RoutingKey, args = Arguments}, fun (_X, Q = #amqqueue{}) -> try rabbit_amqqueue:check_exclusive_access(Q, ConnPid) diff --git a/src/rabbit_channel_interceptor.erl b/src/rabbit_channel_interceptor.erl new file mode 100644 index 00000000..1b0c01f2 --- /dev/null +++ b/src/rabbit_channel_interceptor.erl @@ -0,0 +1,77 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +%% Since the AMQP methods used here are queue related, +%% maybe we want this to be a queue_interceptor. + +-module(rabbit_channel_interceptor). + +-include("rabbit.hrl"). + +-export([intercept_method/1]). + +-ifdef(use_specs). + +-type(intercept_method() :: rabbit_framing:amqp_method_name()). +-type(original_method() :: rabbit_framing:amqp_method_record()). +-type(processed_method() :: rabbit_framing:amqp_method_record()). + +-callback description() -> [proplists:property()]. + +-callback intercept(original_method()) -> + rabbit_types:ok_or_error2(processed_method(), any()). + +%% Whether the interceptor wishes to intercept the amqp method +-callback applies_to(intercept_method()) -> boolean(). + +-else. + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [{description, 0}, {intercept, 1}, {applies_to, 1}]; +behaviour_info(_Other) -> + undefined. + +-endif. + +%%---------------------------------------------------------------------------- + +intercept_method(M) -> + intercept_method(M, select(M)). + +intercept_method(M, []) -> + M; +intercept_method(M, [I]) -> + case I:intercept(M) of + {ok, M2} -> + M2; + {error, Reason} -> + rabbit_misc:protocol_error( + internal_error, "~s", + [Reason]) + end; +intercept_method(M, _) -> + rabbit_misc:protocol_error( + internal_error, + "More than one interceptor defined for method: ~p", + [rabbit_misc:method_record_type(M)]). + +%% select the interceptors that apply to intercept_method(). +select(Method) -> + [M || {_, M} <- rabbit_registry:lookup_all(channel_interceptor), + code:which(M) =/= non_existing, + M:applies_to(Method)]. diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 3014aeb7..abb71e7a 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -126,13 +126,14 @@ sanity_check_module(ClassModule, Module) -> true -> ok end. -class_module(exchange) -> rabbit_exchange_type; -class_module(auth_mechanism) -> rabbit_auth_mechanism; -class_module(runtime_parameter) -> rabbit_runtime_parameter; -class_module(exchange_decorator) -> rabbit_exchange_decorator; -class_module(queue_decorator) -> rabbit_queue_decorator; -class_module(policy_validator) -> rabbit_policy_validator; -class_module(ha_mode) -> rabbit_mirror_queue_mode. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism; +class_module(runtime_parameter) -> rabbit_runtime_parameter; +class_module(exchange_decorator) -> rabbit_exchange_decorator; +class_module(queue_decorator) -> rabbit_queue_decorator; +class_module(policy_validator) -> rabbit_policy_validator; +class_module(ha_mode) -> rabbit_mirror_queue_mode; +class_module(channel_interceptor) -> rabbit_channel_interceptor. %%--------------------------------------------------------------------------- |