diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-01 11:59:37 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-07-01 11:59:37 +0100 |
commit | f983d72a7a2580b5495d54f9b62e2dadae047ca0 (patch) | |
tree | 6a34d187353d68104e35a1dc8f424d056fcaef35 | |
parent | 8de3137ff40361c31fc4cfd5332ef0e0f7336877 (diff) | |
download | rabbitmq-server-bug22924.tar.gz |
move queue equivalence and exclusivity checks into amqqueue modulebug22924
...from the channel, since the latter should have no in-depth
knowledge what these checks entail.
-rw-r--r-- | src/rabbit_amqqueue.erl | 32 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 44 |
2 files changed, 46 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 378d0cbc..0aa7445a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -37,7 +37,8 @@ update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). --export([lookup/1, with/2, with_or_die/2, +-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, + check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, stat_all/0, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). @@ -70,6 +71,10 @@ -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). -spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A). +-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(), + maybe(pid)) -> ok). +-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A). -spec(list/1 :: (vhost()) -> [amqqueue()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (amqqueue()) -> [info()]). @@ -213,6 +218,31 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). +assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, + Durable, AutoDelete, _Args, Owner) -> + check_exclusive_access(Q, Owner, strict); +assert_equivalence(#amqqueue{name = QueueName}, + _Durable, _AutoDelete, _Args, _Owner) -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]). + +check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). + +check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> + ok; +check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> + ok; +check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) -> + rabbit_misc:protocol_error( + resource_locked, + "cannot obtain exclusive access to locked ~s", + [rabbit_misc:rs(QueueName)]). + +with_exclusive_access_or_die(Name, ReaderPid, F) -> + with_or_die(Name, + fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5af893cb..3b2af5cb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -324,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) -> check_read_permitted(Resource, #ch{ username = Username}) -> check_resource_access(Username, Resource, read). -check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) -> - ok; -check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) -> - ok; -check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) -> - rabbit_misc:protocol_error( - resource_locked, - "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]). - -with_exclusive_access_or_die(QName, ReaderPid, F) -> - rabbit_amqqueue:with_or_die( - QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end). - expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -475,7 +462,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, @@ -519,7 +506,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% We get the queue process to send the consume_ok on our %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_consume( @@ -725,18 +712,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), - case rabbit_amqqueue:with(QueueName, - fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of - {{ok, QueueName, MessageCount, ConsumerCount}, - #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q} - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), + case rabbit_amqqueue:with( + QueueName, + fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( + Q, Durable, AutoDelete, Args, Owner), + rabbit_amqqueue:stat(Q) + end) of + {ok, QueueName, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); - {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} -> - rabbit_misc:protocol_error( - precondition_failed, "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]); {error, not_found} -> case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of @@ -767,7 +751,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, {{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), - check_exclusive_access(Q, ReaderPid, lax), + ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -778,7 +762,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), - case with_exclusive_access_or_die( + case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> @@ -814,7 +798,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, _, State = #ch{reader_pid = ReaderPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), - {ok, PurgedMessageCount} = with_exclusive_access_or_die( + {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, @@ -922,7 +906,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments, - fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of + fun (_X, Q) -> + rabbit_amqqueue:check_exclusive_access(Q, ReaderPid) + end) of {error, exchange_not_found} -> rabbit_misc:not_found(ExchangeName); {error, queue_not_found} -> |