summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-01 11:59:37 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-01 11:59:37 +0100
commitf983d72a7a2580b5495d54f9b62e2dadae047ca0 (patch)
tree6a34d187353d68104e35a1dc8f424d056fcaef35
parent8de3137ff40361c31fc4cfd5332ef0e0f7336877 (diff)
downloadrabbitmq-server-bug22924.tar.gz
move queue equivalence and exclusivity checks into amqqueue modulebug22924
...from the channel, since the latter should have no in-depth knowledge what these checks entail.
-rw-r--r--src/rabbit_amqqueue.erl32
-rw-r--r--src/rabbit_channel.erl44
2 files changed, 46 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 378d0cbc..0aa7445a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,7 +37,8 @@
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2]).
-export([pseudo_queue/2]).
--export([lookup/1, with/2, with_or_die/2,
+-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
+ check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, stat_all/0, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
@@ -70,6 +71,10 @@
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
-spec(with_or_die/2 :: (queue_name(), qfun(A)) -> A).
+-spec(assert_equivalence/5 :: (amqqueue(), boolean(), boolean(), amqp_table(),
+ maybe(pid)) -> ok).
+-spec(check_exclusive_access/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(with_exclusive_access_or_die/3 :: (queue_name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (vhost()) -> [amqqueue()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (amqqueue()) -> [info()]).
@@ -213,6 +218,31 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
+assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, _Args, Owner) ->
+ check_exclusive_access(Q, Owner, strict);
+assert_equivalence(#amqqueue{name = QueueName},
+ _Durable, _AutoDelete, _Args, _Owner) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)]).
+
+check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax).
+
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QueueName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s",
+ [rabbit_misc:rs(QueueName)]).
+
+with_exclusive_access_or_die(Name, ReaderPid, F) ->
+ with_or_die(Name,
+ fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5af893cb..3b2af5cb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -324,19 +324,6 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
-check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
- ok;
-check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
- ok;
-check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
-
-with_exclusive_access_or_die(QName, ReaderPid, F) ->
- rabbit_amqqueue:with_or_die(
- QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
-
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_found, "no previously declared queue", []);
@@ -475,7 +462,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
@@ -519,7 +506,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% We get the queue process to send the consume_ok on our
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
rabbit_amqqueue:basic_consume(
@@ -725,18 +712,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(QueueName,
- fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end) of
- {{ok, QueueName, MessageCount, ConsumerCount},
- #amqqueue{durable = Durable1, auto_delete = AutoDelete1} = Q}
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
+ case rabbit_amqqueue:with(
+ QueueName,
+ fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
+ Q, Durable, AutoDelete, Args, Owner),
+ rabbit_amqqueue:stat(Q)
+ end) of
+ {ok, QueueName, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
- {{ok, QueueName, _MessageCount, _ConsumerCount}, #amqqueue{}} ->
- rabbit_misc:protocol_error(
- precondition_failed, "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)]);
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
@@ -767,7 +751,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{{ok, QueueName, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- check_exclusive_access(Q, ReaderPid, lax),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -778,7 +762,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
- case with_exclusive_access_or_die(
+ case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
@@ -814,7 +798,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
_, State = #ch{reader_pid = ReaderPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = with_exclusive_access_or_die(
+ {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
@@ -922,7 +906,9 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
- fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
+ fun (_X, Q) ->
+ rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->