summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl167
-rw-r--r--src/rabbit_access_control.erl124
-rw-r--r--src/rabbit_amqqueue.erl97
-rw-r--r--src/rabbit_channel.erl178
-rw-r--r--src/rabbit_control.erl82
-rw-r--r--src/rabbit_error_logger.erl5
-rw-r--r--src/rabbit_error_logger_file_h.erl74
-rw-r--r--src/rabbit_exchange.erl38
-rw-r--r--src/rabbit_misc.erl100
-rw-r--r--src/rabbit_mnesia.erl19
-rw-r--r--src/rabbit_multi.erl66
-rw-r--r--src/rabbit_node_monitor.erl1
-rw-r--r--src/rabbit_reader.erl40
-rw-r--r--src/rabbit_realm.erl316
-rw-r--r--src/rabbit_router.erl8
-rw-r--r--src/rabbit_sasl_report_file_h.erl86
-rw-r--r--src/rabbit_tests.erl235
-rw-r--r--src/rabbit_ticket.erl131
-rw-r--r--src/rabbit_writer.erl11
19 files changed, 696 insertions, 1082 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e65d532b..c6ef1749 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -27,10 +27,12 @@
-behaviour(application).
--export([start/0, stop/0, stop_and_halt/0, status/0]).
+-export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
-export([start/2, stop/1]).
+-export([log_location/1]).
+
-import(application).
-import(mnesia).
-import(lists).
@@ -46,13 +48,18 @@
-ifdef(use_specs).
+-type(log_location() :: 'tty' | 'undefined' | string()).
+-type(file_suffix() :: binary()).
+
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
+-spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}).
-spec(status/0 :: () ->
[{running_applications, [{atom(), string(), string()}]} |
{nodes, [node()]} |
{running_nodes, [node()]}]).
+-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()).
-endif.
@@ -60,7 +67,7 @@
start() ->
try
- ok = ensure_working_log_config(),
+ ok = ensure_working_log_handlers(),
ok = rabbit_mnesia:ensure_mnesia_dir(),
ok = start_applications(?APPS)
after
@@ -85,6 +92,15 @@ status() ->
[{running_applications, application:which_applications()}] ++
rabbit_mnesia:status().
+rotate_logs(BinarySuffix) ->
+ Suffix = binary_to_list(BinarySuffix),
+ log_rotation_result(rotate_logs(log_location(kernel),
+ Suffix,
+ rabbit_error_logger_file_h),
+ rotate_logs(log_location(sasl),
+ Suffix,
+ rabbit_sasl_report_file_h)).
+
%%--------------------------------------------------------------------
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
@@ -98,7 +114,7 @@ manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
end
end, [], Apps),
ok.
-
+
start_applications(Apps) ->
manage_applications(fun lists:foldl/3,
fun application:start/1,
@@ -128,9 +144,9 @@ start(normal, []) ->
io:format("starting ~-20s ...", [Msg]),
Thunk(),
io:format("done~n");
- ({Msg, M, F, A}) ->
+ ({Msg, M, F, A}) ->
io:format("starting ~-20s ...", [Msg]),
- apply(M, F, A),
+ apply(M, F, A),
io:format("done~n")
end,
[{"database",
@@ -150,14 +166,12 @@ start(normal, []) ->
{"recovery",
fun () ->
ok = maybe_insert_default_data(),
-
ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover(),
- ok = rabbit_realm:recover()
+ ok = rabbit_amqqueue:recover()
end},
{"persister",
- fun () ->
- ok = start_child(rabbit_persister)
+ fun () ->
+ ok = start_child(rabbit_persister)
end},
{"builtin applications",
fun () ->
@@ -188,6 +202,21 @@ stop(_State) ->
%---------------------------------------------------------------------------
+log_location(Type) ->
+ case application:get_env(Type, case Type of
+ kernel -> error_logger;
+ sasl -> sasl_error_logger
+ end) of
+ {ok, {file, File}} -> File;
+ {ok, false} -> undefined;
+ {ok, tty} -> tty;
+ {ok, silent} -> undefined;
+ {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}});
+ _ -> undefined
+ end.
+
+%---------------------------------------------------------------------------
+
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
@@ -196,7 +225,9 @@ print_banner() ->
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
io:format("Logging to ~p~nSASL logging to ~p~n~n",
- [error_log_location(), sasl_log_location()]).
+ [log_location(kernel), log_location(sasl)]).
+
+
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
@@ -204,6 +235,43 @@ start_child(Mod) ->
transient, 100, worker, [Mod]}),
ok.
+ensure_working_log_handlers() ->
+ Handlers = gen_event:which_handlers(error_logger),
+ ok = ensure_working_log_handler(error_logger_file_h,
+ rabbit_error_logger_file_h,
+ error_logger_tty_h,
+ log_location(kernel),
+ Handlers),
+
+ ok = ensure_working_log_handler(sasl_report_file_h,
+ rabbit_sasl_report_file_h,
+ sasl_report_tty_h,
+ log_location(sasl),
+ Handlers),
+ ok.
+
+ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
+ LogLocation, Handlers) ->
+ case LogLocation of
+ undefined -> ok;
+ tty -> case lists:member(TTYHandler, Handlers) of
+ true -> ok;
+ false ->
+ throw({error, {cannot_log_to_tty,
+ TTYHandler, not_installed}})
+ end;
+ _ -> case lists:member(NewFHandler, Handlers) of
+ true -> ok;
+ false -> case rotate_logs(LogLocation, "",
+ OldFHandler, NewFHandler) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_log_to_file,
+ LogLocation, Reason}})
+ end
+ end
+ end.
+
maybe_insert_default_data() ->
case rabbit_mnesia:is_db_empty() of
true -> insert_default_data();
@@ -215,26 +283,8 @@ insert_default_data() ->
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
ok = rabbit_access_control:add_vhost(DefaultVHost),
- ok = insert_default_user(DefaultUser, DefaultPass,
- [{DefaultVHost, [<<"/data">>, <<"/admin">>]}]),
- ok.
-
-insert_default_user(Username, Password, VHostSpecs) ->
- ok = rabbit_access_control:add_user(Username, Password),
- lists:foreach(
- fun ({VHostPath, Realms}) ->
- ok = rabbit_access_control:map_user_vhost(
- Username, VHostPath),
- lists:foreach(
- fun (Realm) ->
- RealmFullName =
- rabbit_misc:r(VHostPath, realm, Realm),
- ok = rabbit_access_control:map_user_realm(
- Username,
- rabbit_access_control:full_ticket(
- RealmFullName))
- end, Realms)
- end, VHostSpecs),
+ ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
+ ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost),
ok.
start_builtin_amq_applications() ->
@@ -243,40 +293,25 @@ start_builtin_amq_applications() ->
%%restart
ok.
-ensure_working_log_config() ->
- case error_logger:logfile(filename) of
- {error, no_log_file} ->
- %% either no log file was configured or opening it failed.
- case application:get_env(kernel, error_logger) of
- {ok, {file, Filename}} ->
- case filelib:ensure_dir(Filename) of
- ok -> ok;
- {error, Reason1} ->
- throw({error, {cannot_log_to_file,
- Filename, Reason1}})
- end,
- case error_logger:logfile({open, Filename}) of
- ok -> ok;
- {error, Reason2} ->
- throw({error, {cannot_log_to_file,
- Filename, Reason2}})
- end;
- _ -> ok
- end;
- _Filename -> ok
- end.
-
-error_log_location() ->
- case error_logger:logfile(filename) of
- {error,no_log_file} -> tty;
- File -> File
+rotate_logs(File, Suffix, Handler) ->
+ rotate_logs(File, Suffix, Handler, Handler).
+
+rotate_logs(File, Suffix, OldHandler, NewHandler) ->
+ case File of
+ undefined -> ok;
+ tty -> ok;
+ _ -> gen_event:swap_handler(
+ error_logger,
+ {OldHandler, swap},
+ {NewHandler, {File, Suffix}})
end.
-sasl_log_location() ->
- case application:get_env(sasl, sasl_error_logger) of
- {ok, {file, File}} -> File;
- {ok, false} -> undefined;
- {ok, tty} -> tty;
- {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}});
- _ -> undefined
- end.
+log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
+ {error, {{cannot_rotate_main_logs, MainLogError},
+ {cannot_rotate_sasl_logs, SaslLogError}}};
+log_rotation_result({error, MainLogError}, ok) ->
+ {error, {cannot_rotate_main_logs, MainLogError}};
+log_rotation_result(ok, {error, SaslLogError}) ->
+ {error, {cannot_rotate_sasl_logs, SaslLogError}};
+log_rotation_result(ok, ok) ->
+ ok.
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 2be07b19..4342e15b 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -28,12 +28,11 @@
-include("rabbit.hrl").
-export([check_login/2, user_pass_login/2,
- check_vhost_access/2, lookup_realm_access/2]).
+ check_vhost_access/2]).
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
-export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]).
-export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]).
--export([list_user_realms/2, map_user_realm/2, full_ticket/1]).
%%----------------------------------------------------------------------------
@@ -42,7 +41,6 @@
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
--spec(lookup_realm_access/2 :: (user(), realm_name()) -> maybe(ticket())).
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -55,9 +53,6 @@
-spec(list_user_vhosts/1 :: (username()) -> [vhost()]).
-spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok').
-spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok').
--spec(map_user_realm/2 :: (username(), ticket()) -> 'ok').
--spec(list_user_realms/2 :: (username(), vhost()) -> [{name(), ticket()}]).
--spec(full_ticket/1 :: (realm_name()) -> ticket()).
-endif.
@@ -87,7 +82,7 @@ check_login(<<"AMQPLAIN">>, Response) ->
[LoginTable])
end;
-check_login(Mechanism, _Response) ->
+check_login(Mechanism, _Response) ->
rabbit_misc:protocol_error(
access_refused, "unsupported authentication mechanism '~s'",
[Mechanism]).
@@ -130,18 +125,6 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
-lookup_realm_access(#user{username = Username}, RealmName = #resource{kind = realm}) ->
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case user_realms(Username, RealmName) of
- [] ->
- none;
- [#user_realm{ticket_pattern = TicketPattern}] ->
- TicketPattern
- end
- end).
-
add_user(Username, Password) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -162,8 +145,7 @@ delete_user(Username) ->
Username,
fun () ->
ok = mnesia:delete({user, Username}),
- ok = mnesia:delete({user_vhost, Username}),
- ok = mnesia:delete({user_realm, Username})
+ ok = mnesia:delete({user_vhost, Username})
end)),
rabbit_log:info("Deleted user ~p~n", [Username]),
R.
@@ -191,24 +173,14 @@ add_vhost(VHostPath) ->
case mnesia:read({vhost, VHostPath}) of
[] ->
ok = mnesia:write(#vhost{virtual_host = VHostPath}),
- DataRealm =
- rabbit_misc:r(VHostPath, realm, <<"/data">>),
- AdminRealm =
- rabbit_misc:r(VHostPath, realm, <<"/admin">>),
- ok = rabbit_realm:add_realm(DataRealm),
- ok = rabbit_realm:add_realm(AdminRealm),
- #exchange{} = rabbit_exchange:declare(
- DataRealm, <<"">>,
- direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(
- DataRealm, <<"amq.direct">>,
- direct, true, false, []),
- #exchange{} = rabbit_exchange:declare(
- DataRealm, <<"amq.topic">>,
- topic, true, false, []),
- #exchange{} = rabbit_exchange:declare(
- DataRealm, <<"amq.fanout">>,
- fanout, true, false, []),
+ [rabbit_exchange:declare(
+ rabbit_misc:r(VHostPath, exchange, Name),
+ Type, true, false, []) ||
+ {Name,Type} <-
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.fanout">>, fanout}]],
ok;
[_] ->
mnesia:abort({vhost_already_exists, VHostPath})
@@ -240,11 +212,6 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list_vhost_exchanges(VHostPath)),
- lists:foreach(fun (RealmName) ->
- ok = rabbit_realm:delete_realm(
- rabbit_misc:r(VHostPath, realm, RealmName))
- end,
- rabbit_realm:list_vhost_realms(VHostPath)),
lists:foreach(fun (Username) ->
ok = unmap_user_vhost(Username, VHostPath)
end,
@@ -290,77 +257,8 @@ unmap_user_vhost(Username, VHostPath) ->
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
- lists:foreach(fun mnesia:delete_object/1,
- user_realms(Username,
- rabbit_misc:r(VHostPath, realm))),
ok = mnesia:delete_object(
#user_vhost{username = Username,
virtual_host = VHostPath})
end)).
-map_user_realm(Username,
- Ticket = #ticket{realm_name = RealmName =
- #resource{virtual_host = VHostPath,
- kind = realm}}) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- rabbit_misc:with_realm(
- RealmName,
- fun () ->
- lists:foreach(fun mnesia:delete_object/1,
- user_realms(Username, RealmName)),
- case internal_lookup_vhost_access(Username, VHostPath) of
- {ok, _R} ->
- case ticket_liveness(Ticket) of
- alive ->
- ok = mnesia:write(
- #user_realm{username = Username,
- realm = RealmName,
- ticket_pattern = Ticket});
- dead ->
- ok
- end;
- not_found ->
- mnesia:abort(not_mapped_to_vhost)
- end
- end))).
-
-list_user_realms(Username, VHostPath) ->
- [{Name, Pattern} ||
- #user_realm{realm = #resource{name = Name},
- ticket_pattern = Pattern} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user_and_vhost(
- Username, VHostPath,
- fun () ->
- case internal_lookup_vhost_access(
- Username, VHostPath) of
- {ok, _R} ->
- user_realms(Username,
- rabbit_misc:r(VHostPath, realm));
- not_found ->
- mnesia:abort(not_mapped_to_vhost)
- end
- end))].
-
-ticket_liveness(#ticket{passive_flag = false,
- active_flag = false,
- write_flag = false,
- read_flag = false}) ->
- dead;
-ticket_liveness(_) ->
- alive.
-
-full_ticket(RealmName) ->
- #ticket{realm_name = RealmName,
- passive_flag = true,
- active_flag = true,
- write_flag = true,
- read_flag = true}.
-
-user_realms(Username, RealmName) ->
- mnesia:match_object(#user_realm{username = Username,
- realm = RealmName,
- _ = '_'}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 63f043ba..bd64f1e4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -25,15 +25,15 @@
-module(rabbit_amqqueue).
--export([start/0, recover/0, declare/5, delete/3, purge/1, internal_delete/1]).
--export([pseudo_queue/3]).
+-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
+-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
- commit/2, rollback/2]).
+ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_down/2]).
+-export([notify_sent/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
-import(mnesia).
@@ -44,6 +44,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(CALL_TIMEOUT, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -53,9 +55,12 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(bind_res() :: {'ok', non_neg_integer()} |
{'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(ok_or_errors() ::
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/5 :: (realm_name(), name(), bool(), bool(), amqp_table()) ->
+-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
-spec(add_binding/4 ::
(queue_name(), exchange_name(), routing_key(), amqp_table()) ->
@@ -81,9 +86,9 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit/2 :: (pid(), txn()) -> 'ok').
--spec(rollback/2 :: (pid(), txn()) -> 'ok').
--spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
@@ -96,7 +101,7 @@
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(pseudo_queue/3 :: (realm_name(), binary(), pid()) -> amqqueue()).
+-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
-endif.
@@ -130,9 +135,8 @@ recover_durable_queues() ->
ok
end).
-declare(RealmName, NameBin, Durable, AutoDelete, Args) ->
- QName = rabbit_misc:r(RealmName, queue, NameBin),
- Q = start_queue_process(#amqqueue{name = QName,
+declare(QueueName, Durable, AutoDelete, Args) ->
+ Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args,
@@ -140,9 +144,8 @@ declare(RealmName, NameBin, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QName}) of
+ case mnesia:wread({amqqueue, QueueName}) of
[] -> ok = recover_queue(Q),
- ok = rabbit_realm:add(RealmName, QName),
Q;
[ExistingQ] -> ExistingQ
end
@@ -251,7 +254,7 @@ with(Name, F, E) ->
end.
with(Name, F) ->
- with(Name, F, fun () -> {error, not_found} end).
+ with(Name, F, fun () -> {error, not_found} end).
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(Name)])
@@ -289,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
-commit(QPid, Txn) ->
- gen_server:call(QPid, {commit, Txn}).
-
-rollback(QPid, Txn) ->
- gen_server:cast(QPid, {rollback, Txn}).
-
-notify_down(#amqqueue{ pid = QPid }, ChPid) ->
- gen_server:call(QPid, {notify_down, ChPid}).
+commit_all(QPids, Txn) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ QPids).
+
+rollback_all(QPids, Txn) ->
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ QPids).
+
+notify_down_all(QPids, ChPid) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) ->
+ rabbit_misc:with_exit_handler(
+ %% we don't care if the queue process has terminated
+ %% in the meantime
+ fun () -> ok end,
+ fun () -> gen_server:call(QPid, {notify_down, ChPid},
+ Timeout) end)
+ end,
+ QPids).
binding_forcibly_removed(BindingSpec, QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@@ -338,28 +356,20 @@ internal_delete(QueueName) ->
case mnesia:wread({amqqueue, QueueName}) of
[] -> {error, not_found};
[Q] ->
- ok = delete_temp(Q),
+ ok = delete_queue(Q),
ok = mnesia:delete({durable_queues, QueueName}),
- ok = rabbit_realm:delete_from_all(QueueName),
ok
end
end).
-delete_temp(Q = #amqqueue{name = QueueName}) ->
+delete_queue(Q = #amqqueue{name = QueueName}) ->
ok = delete_bindings(Q),
ok = rabbit_exchange:delete_binding(
default_binding_spec(QueueName), Q),
ok = mnesia:delete({amqqueue, QueueName}),
ok.
-delete_queue(Q = #amqqueue{name = QueueName, durable = Durable}) ->
- ok = delete_temp(Q),
- if
- Durable -> ok;
- true -> ok = rabbit_realm:delete_from_all(QueueName)
- end.
-
-on_node_down(Node) ->
+on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
@@ -370,10 +380,23 @@ on_node_down(Node) ->
node(Pid) == Node]))
end).
-pseudo_queue(RealmName, NameBin, Pid) ->
- #amqqueue{name = rabbit_misc:r(RealmName, queue, NameBin),
+pseudo_queue(QueueName, Pid) ->
+ #amqqueue{name = QueueName,
durable = false,
auto_delete = false,
arguments = [],
binding_specs = [],
pid = Pid}.
+
+safe_pmap_ok(F, L) ->
+ case [R || R <- rabbit_misc:upmap(
+ fun (V) ->
+ try F(V)
+ catch Class:Reason -> {Class, Reason}
+ end
+ end, L),
+ R =/= ok] of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ec1d1fba..a9278898 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,7 +37,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
- most_recently_declared_queue, consumer_mapping, next_ticket}).
+ most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@@ -94,8 +94,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new(),
- next_ticket = 101}.
+ consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
case (catch handle_method(Method, Content, State)) of
@@ -140,7 +139,6 @@ handle_message(Other, State) ->
terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
Res = notify_queues(internal_rollback(State)),
- ok = rabbit_realm:leave_realms(self()),
case Reason of
normal -> ok = Res;
_ -> ok
@@ -195,14 +193,6 @@ die_precondition_failed(Fmt, Params) ->
rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>},
Fmt, Params).
-check_ticket(TicketNumber, FieldIndex, Name, #ch{ username = Username}) ->
- rabbit_ticket:check_ticket(TicketNumber, FieldIndex, Name, Username).
-
-lookup_ticket(TicketNumber, FieldIndex,
- #ch{ username = Username, virtual_host = VHostPath }) ->
- rabbit_ticket:lookup_ticket(TicketNumber, FieldIndex,
- Username, VHostPath).
-
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -235,57 +225,19 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
- ok = rabbit_realm:leave_realms(self()),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
ok = rabbit_writer:shutdown(WriterPid),
stop;
-handle_method(#'access.request'{realm = RealmNameBin,
- exclusive = Exclusive,
- passive = Passive,
- active = Active,
- write = Write,
- read = Read},
- _, State = #ch{username = Username,
- virtual_host = VHostPath,
- next_ticket = NextTicket}) ->
- RealmName = rabbit_misc:r(VHostPath, realm, RealmNameBin),
- Ticket = #ticket{realm_name = RealmName,
- passive_flag = Passive,
- active_flag = Active,
- write_flag = Write,
- read_flag = Read},
- case rabbit_realm:access_request(Username, Exclusive, Ticket) of
- ok ->
- rabbit_ticket:record_ticket(NextTicket, Ticket),
- NewState = State#ch{next_ticket = NextTicket + 1},
- {reply, #'access.request_ok'{ticket = NextTicket}, NewState};
- {error, not_found} ->
- rabbit_misc:protocol_error(
- invalid_path, "no ~s", [rabbit_misc:rs(RealmName)]);
- {error, bad_realm_path} ->
- %% FIXME: spec bug? access_refused is a soft error, spec requires it to be hard
- rabbit_misc:protocol_error(
- access_refused, "bad path for ~s", [rabbit_misc:rs(RealmName)]);
- {error, resource_locked} ->
- rabbit_misc:protocol_error(
- resource_locked, "~s is locked", [rabbit_misc:rs(RealmName)]);
- {error, access_refused} ->
- rabbit_misc:protocol_error(
- access_refused,
- "~w permissions denied for user '~s' attempting to access ~s",
- [rabbit_misc:permission_list(Ticket),
- Username, rabbit_misc:rs(RealmName)])
- end;
+handle_method(#'access.request'{},_, State) ->
+ {reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{ticket = TicketNumber,
- exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_ticket(TicketNumber, #ticket.write_flag, ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
@@ -323,13 +275,11 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
uncommitted_ack_q = NewUAQ})
end};
-handle_method(#'basic.get'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_ticket(TicketNumber, #ticket.read_flag, QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
@@ -352,8 +302,7 @@ handle_method(#'basic.get'{ticket = TicketNumber,
{reply, #'basic.get_empty'{cluster_id = <<>>}, State}
end;
-handle_method(#'basic.consume'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = _, % FIXME: implement
no_ack = NoAck,
@@ -365,7 +314,6 @@ handle_method(#'basic.consume'{ticket = TicketNumber,
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_ticket(TicketNumber, #ticket.read_flag, QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_misc:binstring_guid("amq.ctag");
@@ -391,7 +339,7 @@ handle_method(#'basic.consume'{ticket = TicketNumber,
ConsumerMapping)}};
{error, queue_owned_by_another_connection} ->
%% The spec is silent on which exception to use
- %% here. This seems reasonable?
+ %% here. This seems reasonable?
%% FIXME: check this
rabbit_misc:protocol_error(
@@ -495,8 +443,7 @@ handle_method(#'basic.recover'{}, _, _State) ->
rabbit_misc:protocol_error(
not_allowed, "attempt to recover a transactional channel",[]);
-handle_method(#'exchange.declare'{ticket = TicketNumber,
- exchange = ExchangeNameBin,
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
durable = Durable,
@@ -505,17 +452,13 @@ handle_method(#'exchange.declare'{ticket = TicketNumber,
nowait = NoWait,
arguments = Args},
_, State = #ch{ virtual_host = VHostPath }) ->
- #ticket{realm_name = RealmName} =
- lookup_ticket(TicketNumber, #ticket.active_flag, State),
CheckedType = rabbit_exchange:check_type(TypeNameBin),
- %% FIXME: clarify spec as per declare wrt differing realms
- X = case rabbit_exchange:lookup(
- rabbit_misc:r(VHostPath, exchange, ExchangeNameBin)) of
+ ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
- ActualNameBin = check_name('exchange', ExchangeNameBin),
- rabbit_exchange:declare(RealmName,
- ActualNameBin,
+ check_name('exchange', ExchangeNameBin),
+ rabbit_exchange:declare(ExchangeName,
CheckedType,
Durable,
AutoDelete,
@@ -524,26 +467,21 @@ handle_method(#'exchange.declare'{ticket = TicketNumber,
ok = rabbit_exchange:assert_type(X, CheckedType),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.declare'{ticket = TicketNumber,
- exchange = ExchangeNameBin,
+handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
- %% FIXME: spec issue: permit active_flag here as well as passive_flag?
- #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
X = rabbit_exchange:lookup_or_die(ExchangeName),
ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
return_ok(State, NoWait, #'exchange.declare_ok'{});
-handle_method(#'exchange.delete'{ticket = TicketNumber,
- exchange = ExchangeNameBin,
+handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
if_unused = IfUnused,
nowait = NoWait},
_, State = #ch { virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_ticket(TicketNumber, #ticket.active_flag, ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -555,8 +493,7 @@ handle_method(#'exchange.delete'{ticket = TicketNumber,
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
-handle_method(#'queue.declare'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = Durable,
exclusive = ExclusiveDeclare,
@@ -565,8 +502,6 @@ handle_method(#'queue.declare'{ticket = TicketNumber,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
reader_pid = ReaderPid }) ->
- #ticket{realm_name = RealmName} =
- lookup_ticket(TicketNumber, #ticket.active_flag, State),
%% FIXME: atomic create&claim
Finish =
fun (Q) ->
@@ -587,7 +522,6 @@ handle_method(#'queue.declare'{ticket = TicketNumber,
end,
Q
end,
- %% FIXME: clarify spec as per declare wrt differing realms
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
Finish) of
@@ -597,34 +531,28 @@ handle_method(#'queue.declare'{ticket = TicketNumber,
<<>> -> rabbit_misc:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- Finish(rabbit_amqqueue:declare(RealmName,
- ActualNameBin,
- Durable,
- AutoDelete,
- Args));
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ Finish(rabbit_amqqueue:declare(QueueName,
+ Durable, AutoDelete, Args));
Other -> Other
end,
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.declare'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
- #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State),
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
-handle_method(#'queue.delete'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait
},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_ticket(TicketNumber, #ticket.active_flag, QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
@@ -640,8 +568,7 @@ handle_method(#'queue.delete'{ticket = TicketNumber,
message_count = PurgedMessageCount})
end;
-handle_method(#'queue.bind'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'queue.bind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
nowait = NoWait,
@@ -652,14 +579,13 @@ handle_method(#'queue.bind'{ticket = TicketNumber,
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
- check_ticket(TicketNumber, #ticket.active_flag, QueueName, State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
case rabbit_amqqueue:add_binding(QueueName, ExchangeName,
ActualRoutingKey, Arguments) of
- {error, queue_not_found} ->
+ {error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
+ {error, exchange_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, durability_settings_incompatible} ->
@@ -670,12 +596,10 @@ handle_method(#'queue.bind'{ticket = TicketNumber,
return_ok(State, NoWait, #'queue.bind_ok'{})
end;
-handle_method(#'queue.purge'{ticket = TicketNumber,
- queue = QueueNameBin,
+handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
- check_ticket(TicketNumber, #ticket.read_flag, QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
@@ -783,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
make_tx_id() -> rabbit_misc:guid().
-safe_pmap_set_ok(F, S) ->
- case lists:filter(fun (R) -> R =/= ok end,
- rabbit_misc:upmap(
- fun (V) ->
- try F(V)
- catch Class:Reason -> {Class, Reason}
- end
- end, sets:to_list(S))) of
- [] -> ok;
- Errors -> {error, Errors}
- end.
-
-notify_participants(F, TxnKey, Participants) ->
- safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
-
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
tx_participants = sets:new(),
@@ -805,8 +714,8 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
- case notify_participants(fun rabbit_amqqueue:commit/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:commit_all(sets:to_list(Participants),
+ TxnKey) of
ok -> new_tx(State);
{error, Errors} -> exit({commit_failed, Errors})
end.
@@ -819,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case notify_participants(fun rabbit_amqqueue:rollback/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> exit({rollback_failed, Errors})
@@ -843,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- safe_pmap_set_ok(
- fun (QueueName) ->
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:notify_down(Q, ProxyPid)
- end) of
- ok ->
- ok;
- {error, not_found} ->
- %% queue has been deleted in the meantime
- ok
- end
- end,
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)).
+ rabbit_amqqueue:notify_down_all(
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end],
+ ProxyPid).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index ad796b61..bc588279 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -73,6 +73,7 @@ Available commands:
force_reset
cluster <ClusterNode> ...
status
+ rotate_logs [Suffix]
add_user <UserName> <Password>
delete_user <UserName>
@@ -88,17 +89,6 @@ Available commands:
list_user_vhosts <UserName>
list_vhost_users <VHostPath>
- add_realm <VHostPath> <RealmName>
- delete_realm <VHostPath> <RealmName>
- list_realms <VHostPath>
-
- set_permissions <UserName> <VHostPath> <RealmName> [<Permission> ...]
- Permissions management. The available permissions are 'passive',
- 'active', 'write' and 'read', corresponding to the permissions
- referred to in AMQP's \"access.request\" message, or 'all' as an
- abbreviation for all defined permission flags.
- list_permissions <UserName> <VHostPath>
-
<node> should be the name of the master node of the RabbitMQ cluster. It
defaults to the node named \"rabbit\" on the local host. On a host named
\"server.example.com\", the master node will usually be rabbit@server (unless
@@ -140,6 +130,13 @@ action(status, Node, []) ->
io:format("~n~p~n", [Res]),
ok;
+action(rotate_logs, Node, []) ->
+ io:format("Reopening logs for node ~p ...", [Node]),
+ call(Node, {rabbit, rotate_logs, [""]});
+action(rotate_logs, Node, Args = [Suffix]) ->
+ io:format("Rotating logs to files with suffix ~p ...", [Suffix]),
+ call(Node, {rabbit, rotate_logs, Args});
+
action(add_user, Node, Args = [Username, _Password]) ->
io:format("Creating user ~p ...", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
@@ -182,68 +179,7 @@ action(list_user_vhosts, Node, Args = [_Username]) ->
action(list_vhost_users, Node, Args = [_VHostPath]) ->
io:format("Listing users for vhosts ~p...", Args),
- display_list(call(Node, {rabbit_access_control, list_vhost_users, Args}));
-
-action(add_realm, Node, [VHostPath, RealmName]) ->
- io:format("Adding realm ~p to vhost ~p ...", [RealmName, VHostPath]),
- rpc_call(Node, rabbit_realm, add_realm,
- [realm_rsrc(VHostPath, RealmName)]);
-
-action(delete_realm, Node, [VHostPath, RealmName]) ->
- io:format("Deleting realm ~p from vhost ~p ...", [RealmName, VHostPath]),
- rpc_call(Node, rabbit_realm, delete_realm,
- [realm_rsrc(VHostPath, RealmName)]);
-
-action(list_realms, Node, Args = [_VHostPath]) ->
- io:format("Listing realms for vhost ~p ...", Args),
- display_list(call(Node, {rabbit_realm, list_vhost_realms, Args}));
-
-action(set_permissions, Node,
- [Username, VHostPath, RealmName | Permissions]) ->
- io:format("Setting permissions for user ~p, vhost ~p, realm ~p ...",
- [Username, VHostPath, RealmName]),
- CheckedPermissions = check_permissions(Permissions),
- Ticket = #ticket{
- realm_name = realm_rsrc(VHostPath, RealmName),
- passive_flag = lists:member(passive, CheckedPermissions),
- active_flag = lists:member(active, CheckedPermissions),
- write_flag = lists:member(write, CheckedPermissions),
- read_flag = lists:member(read, CheckedPermissions)},
- rpc_call(Node, rabbit_access_control, map_user_realm,
- [list_to_binary(Username), Ticket]);
-
-action(list_permissions, Node, Args = [_Username, _VHostPath]) ->
- io:format("Listing permissions for user ~p in vhost ~p ...", Args),
- Perms = call(Node, {rabbit_access_control, list_user_realms, Args}),
- if is_list(Perms) ->
- lists:foreach(
- fun ({RealmName, Pattern}) ->
- io:format("~n~s: ~p",
- [binary_to_list(RealmName),
- rabbit_misc:permission_list(Pattern)])
- end,
- lists:sort(Perms)),
- io:nl(),
- ok;
- true -> Perms
- end.
-
-check_permissions([]) -> [];
-check_permissions(["all" | R]) ->
- [passive, active, write, read | check_permissions(R)];
-check_permissions([P | R]) when (P == "passive") or
- (P == "active") or
- (P == "write") or
- (P == "read") ->
- [list_to_atom(P) | check_permissions(R)];
-check_permissions([P | _R]) ->
- io:format("~nError: invalid permission flag ~p~n", [P]),
- usage().
-
-realm_rsrc(VHostPath, RealmName) ->
- rabbit_misc:r(list_to_binary(VHostPath),
- realm,
- list_to_binary(RealmName)).
+ display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})).
display_list(L) when is_list(L) ->
lists:foreach(fun (I) ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 0ae116bb..9220d7b4 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -34,10 +34,7 @@
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
- #resource{virtual_host = DefaultVHost,
- kind = realm,
- name = <<"/admin">>},
- ?LOG_EXCH_NAME,
+ rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
topic, true, false, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
new file mode 100644
index 00000000..d67b02ef
--- /dev/null
+++ b/src/rabbit_error_logger_file_h.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial Technologies
+%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
+%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_error_logger_file_h).
+
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+
+%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h
+%% module because the original's init/1 does not match properly
+%% with the result of closing the old handler when swapping handlers.
+%% The first init/1 additionally allows for simple log rotation
+%% when the suffix is not the empty string.
+
+%% Used only when swapping handlers in log rotation
+init({{File, Suffix}, []}) ->
+ case rabbit_misc:append_file(File, Suffix) of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to append contents of " ++
+ "log file '~s' to '~s':~n~p~n",
+ [File, [File, Suffix], Error])
+ end,
+ init(File);
+%% Used only when swapping handlers and the original handler
+%% failed to terminate or was never installed
+init({{File, _}, error}) ->
+ init(File);
+%% Used only when swapping handlers without performing
+%% log rotation
+init({File, []}) ->
+ init(File);
+init({_File, _Type} = FileInfo) ->
+ error_logger_file_h:init(FileInfo);
+init(File) ->
+ error_logger_file_h:init(File).
+
+handle_event(Event, State) ->
+ error_logger_file_h:handle_event(Event, State).
+
+handle_info(Event, State) ->
+ error_logger_file_h:handle_info(Event, State).
+
+handle_call(Event, State) ->
+ error_logger_file_h:handle_call(Event, State).
+
+terminate(Reason, State) ->
+ error_logger_file_h:terminate(Reason, State).
+
+code_change(OldVsn, State, Extra) ->
+ error_logger_file_h:code_change(OldVsn, State, Extra).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 113b7878..bb132a50 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -28,7 +28,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([recover/0, declare/6, lookup/1, lookup_or_die/1,
+-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list_vhost_exchanges/1, list_exchange_bindings/1,
simple_publish/6, simple_publish/3,
route/2]).
@@ -50,21 +50,21 @@
not_found() | {'error', 'unroutable' | 'not_delivered'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/6 :: (realm_name(), name(), exchange_type(), bool(), bool(),
+-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
+-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]).
--spec(list_exchange_bindings/1 :: (exchange_name()) ->
+-spec(list_exchange_bindings/1 :: (exchange_name()) ->
[{queue_name(), routing_key(), amqp_table()}]).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
-spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
--spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
+-spec(add_binding/2 :: (binding_spec(), amqqueue()) ->
'ok' | not_found() |
{'error', 'durability_settings_incompatible'}).
-spec(delete_binding/2 :: (binding_spec(), amqqueue()) ->
@@ -90,23 +90,21 @@ recover_durable_exchanges() ->
end, ok, durable_exchanges)
end).
-declare(RealmName, NameBin, Type, Durable, AutoDelete, Args) ->
- XName = rabbit_misc:r(RealmName, exchange, NameBin),
- Exchange = #exchange{name = XName,
+declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
+ Exchange = #exchange{name = ExchangeName,
type = Type,
durable = Durable,
auto_delete = AutoDelete,
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, XName}) of
+ case mnesia:wread({exchange, ExchangeName}) of
[] -> ok = mnesia:write(Exchange),
if Durable ->
ok = mnesia:write(
durable_exchanges, Exchange, write);
true -> ok
end,
- ok = rabbit_realm:add(RealmName, XName),
Exchange;
[ExistingX] -> ExistingX
end
@@ -147,15 +145,14 @@ list_vhost_exchanges(VHostPath) ->
list_exchange_bindings(Name) ->
[{QueueName, RoutingKey, Arguments} ||
- #binding{handlers = Handlers} <- bindings_for_exchange(Name),
- #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
- arguments = Arguments},
- queue = QueueName} <- Handlers].
+ #binding{handlers = Handlers} <- bindings_for_exchange(Name),
+ #handler{binding_spec = #binding_spec{routing_key = RoutingKey,
+ arguments = Arguments},
+ queue = QueueName} <- Handlers].
bindings_for_exchange(Name) ->
- qlc:e(qlc:q([B ||
- B = #binding{key = K} <- mnesia:table(binding),
- element(1, K) == Name])).
+ qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding),
+ element(1, K) == Name])).
empty_handlers() ->
[].
@@ -187,7 +184,7 @@ simple_publish(Mandatory, Immediate,
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
-%%
+%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
@@ -197,7 +194,7 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
mnesia:activity(
async_dirty,
fun () ->
- qlc:e(qlc:q([handler_qpids(H) ||
+ qlc:e(qlc:q([handler_qpids(H) ||
#binding{key = {Name1, PatternKey},
handlers = H}
<- mnesia:table(binding),
@@ -375,6 +372,5 @@ do_internal_delete(ExchangeName, Bindings) ->
ok = mnesia:delete({binding, K})
end, Bindings),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName}),
- ok = rabbit_realm:delete_from_all(ExchangeName)
+ ok = mnesia:delete({exchange, ExchangeName})
end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index b71aba42..89648f4f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -26,23 +26,23 @@
-module(rabbit_misc).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
+-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
--export([strict_ticket_checking/0]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
-export([r/3, r/2, rs/1]).
--export([permission_list/1]).
-export([enable_cover/0, report_cover/0]).
-export([throw_on_error/2, with_exit_handler/2]).
--export([with_user/2, with_vhost/2, with_realm/2, with_user_and_vhost/3]).
+-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([guid/0, string_guid/1, binstring_guid/1]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
+-export([append_file/2]).
-import(mnesia).
-import(lists).
@@ -64,34 +64,30 @@
(atom() | amqp_error(), string(), [any()]) -> no_return()).
-spec(protocol_error/4 ::
(atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
--spec(strict_ticket_checking/0 :: () -> bool()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (realm_name() | vhost(), K, name()) ->
- r(K) when is_subtype(K, atom())).
+-spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
when is_subtype(K, atom())).
--spec(rs/1 :: (r(atom())) -> string()).
--spec(permission_list/1 :: (ticket()) -> [permission()]).
+-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
--spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
--spec(with_user/2 :: (username(), thunk(A)) -> A).
+-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
+-spec(with_user/2 :: (username(), thunk(A)) -> A).
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
--spec(with_realm/2 :: (realm_name(), thunk(A)) -> A).
--spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
+-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> node()).
--spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
+-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
--spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(guid/0 :: () -> guid()).
-spec(string_guid/1 :: (any()) -> string()).
@@ -100,6 +96,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
+-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-endif.
@@ -130,24 +127,6 @@ protocol_error(Error, Explanation, Params, Method) ->
CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
exit({amqp, Error, CompleteExplanation, Method}).
-boolean_config_param(Name, TrueValue, FalseValue, DefaultValue) ->
- ActualValue = get_config(Name, DefaultValue),
- if
- ActualValue == TrueValue ->
- true;
- ActualValue == FalseValue ->
- false;
- true ->
- rabbit_log:error(
- "Bad setting for config param '~w': ~p~n" ++
- "legal values are '~w', '~w'; using default value '~w'",
- [Name, ActualValue, TrueValue, FalseValue, DefaultValue]),
- DefaultValue == TrueValue
- end.
-
-strict_ticket_checking() ->
- boolean_config_param(strict_ticket_checking, enabled, disabled, disabled).
-
get_config(Key) ->
case dirty_read({rabbit_config, Key}) of
{ok, {rabbit_config, Key, V}} -> {ok, V};
@@ -182,19 +161,6 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
-permission_list(Ticket = #ticket{}) ->
- lists:foldr(fun ({Field, Label}, L) ->
- case element(Field, Ticket) of
- true -> [Label | L];
- false -> L
- end
- end,
- [],
- [{#ticket.passive_flag, passive},
- {#ticket.active_flag, active},
- {#ticket.write_flag, write},
- {#ticket.read_flag, read}]).
-
enable_cover() ->
case cover:compile_beam_directory("ebin") of
{error,Reason} -> {error,Reason};
@@ -260,32 +226,13 @@ with_user(Username, Thunk) ->
with_vhost(VHostPath, Thunk) ->
fun () ->
case mnesia:read({vhost, VHostPath}) of
- [] ->
+ [] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
Thunk()
end
end.
-with_realm(Name = #resource{virtual_host = VHostPath, kind = realm},
- Thunk) ->
- fun () ->
- case mnesia:read({realm, Name}) of
- [] ->
- mnesia:abort({no_such_realm, Name});
- [_R] ->
- case mnesia:match_object(
- #vhost_realm{virtual_host = VHostPath,
- realm = Name}) of
- [] ->
- %% This should never happen
- mnesia:abort({no_such_realm, Name});
- [_VR] ->
- Thunk()
- end
- end
- end.
-
with_user_and_vhost(Username, VHostPath, Thunk) ->
with_user(Username, with_vhost(VHostPath, Thunk)).
@@ -398,3 +345,24 @@ dirty_dump_log1(LH, {K, Terms}) ->
dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+
+
+append_file(File, Suffix) ->
+ case file:read_file_info(File) of
+ {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
+ {error, enoent} -> append_file(File, 0, Suffix);
+ Error -> Error
+ end.
+
+append_file(_, _, "") ->
+ ok;
+append_file(File, 0, Suffix) ->
+ case file:open([File, Suffix], [append]) of
+ {ok, Fd} -> file:close(Fd);
+ Error -> Error
+ end;
+append_file(File, _, Suffix) ->
+ case file:read_file(File) of
+ {ok, Data} -> file:write_file([File, Suffix], Data, [append]);
+ Error -> Error
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 82b80cb4..4ae367ba 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -102,23 +102,6 @@ table_definitions() ->
{index, [virtual_host]}]},
{vhost, [{disc_copies, [node()]},
{attributes, record_info(fields, vhost)}]},
- {vhost_realm, [{type, bag},
- {disc_copies, [node()]},
- {attributes, record_info(fields, vhost_realm)},
- {index, [realm]}]},
- {realm, [{disc_copies, [node()]},
- {attributes, record_info(fields, realm)}]},
- {user_realm, [{type, bag},
- {disc_copies, [node()]},
- {attributes, record_info(fields, user_realm)},
- {index, [realm]}]},
- {exclusive_realm_visitor,
- [{record_name, realm_visitor},
- {attributes, record_info(fields, realm_visitor)},
- {index, [pid]}]},
- {realm_visitor, [{type, bag},
- {attributes, record_info(fields, realm_visitor)},
- {index, [pid]}]},
{rabbit_config, [{disc_copies, [node()]}]},
{listener, [{type, bag},
{attributes, record_info(fields, listener)}]},
@@ -257,7 +240,6 @@ init_db(ClusterNodes) ->
ClusterNodes}})
end;
{ok, [_|_]} ->
- ok = ensure_schema_integrity(),
ok = wait_for_tables(),
ok = create_local_table_copies(
case IsDiskNode of
@@ -341,6 +323,7 @@ create_local_table_copy(Tab, Type) ->
ok.
wait_for_tables() ->
+ ok = ensure_schema_integrity(),
case mnesia:wait_for_tables(table_names(), 30000) of
ok -> ok;
{timeout, BadTabs} ->
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index cd92f1ac..c6a7e920 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -70,7 +70,9 @@ usage() ->
Available commands:
start_all <NodeCount> - start a local cluster of RabbitMQ nodes.
+ status - print status of all running nodes
stop_all - stops all local RabbitMQ nodes.
+ rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes.
"),
halt(3).
@@ -87,13 +89,46 @@ action(start_all, [NodeCount], RpcTimeout) ->
false -> timeout
end;
+action(status, [], RpcTimeout) ->
+ io:format("Status of all running nodes...~n", []),
+ call_all_nodes(
+ fun({Node, Pid}) ->
+ Status = rpc:call(Node, rabbit, status, [], RpcTimeout),
+ io:format("Node '~p' with Pid ~p: ~p~n",
+ [Node, Pid, case parse_status(Status) of
+ false -> not_running;
+ true -> running
+ end])
+ end);
+
action(stop_all, [], RpcTimeout) ->
io:format("Stopping all nodes...~n", []),
- case read_pids_file() of
- [] -> throw(no_nodes_running);
- NodePids -> stop_nodes(NodePids, RpcTimeout),
- delete_pids_file()
- end.
+ call_all_nodes(fun({Node, Pid}) ->
+ io:format("Stopping node ~p~n", [Node]),
+ rpc:call(Node, rabbit, stop_and_halt, []),
+ case kill_wait(Pid, RpcTimeout, false) of
+ false -> kill_wait(Pid, RpcTimeout, true);
+ true -> ok
+ end,
+ io:format("OK~n", [])
+ end),
+ delete_pids_file();
+
+action(rotate_logs, [], RpcTimeout) ->
+ action(rotate_logs, [""], RpcTimeout);
+
+action(rotate_logs, [Suffix], RpcTimeout) ->
+ io:format("Rotating logs for all nodes...~n", []),
+ BinarySuffix = list_to_binary(Suffix),
+ call_all_nodes(
+ fun ({Node, _}) ->
+ io:format("Rotating logs for node ~p", [Node]),
+ case rpc:call(Node, rabbit, rotate_logs,
+ [BinarySuffix], RpcTimeout) of
+ {badrpc, Error} -> io:format(": ~p.~n", [Error]);
+ ok -> io:format(": ok.~n", [])
+ end
+ end).
%% PNodePid is the list of PIDs
%% Running is a boolean exhibiting success at some moment
@@ -222,21 +257,6 @@ read_pids_file() ->
FileName, Reason}})
end.
-stop_nodes([],_) -> ok;
-
-stop_nodes([NodePid | Rest], RpcTimeout) ->
- stop_node(NodePid, RpcTimeout),
- stop_nodes(Rest, RpcTimeout).
-
-stop_node({Node, Pid}, RpcTimeout) ->
- io:format("Stopping node ~p~n", [Node]),
- rpc:call(Node, rabbit, stop_and_halt, []),
- case kill_wait(Pid, RpcTimeout, false) of
- false -> kill_wait(Pid, RpcTimeout, true);
- true -> ok
- end,
- io:format("OK~n", []).
-
kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 ->
Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9";
true -> "kill"
@@ -272,6 +292,12 @@ is_dead(Pid) ->
end
end}]).
+call_all_nodes(Func) ->
+ case read_pids_file() of
+ [] -> throw(no_nodes_running);
+ NodePids -> lists:foreach(Func, NodePids)
+ end.
+
getenv(Var) ->
case os:getenv(Var) of
false -> throw({missing_env_var, Var});
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index beef5285..2fb582a9 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -60,7 +60,6 @@ handle_info({nodedown, Node}, State) ->
%% lots of nodes. We really only need to execute this code on
%% *one* node, rather than all of them.
ok = rabbit_networking:on_node_down(Node),
- ok = rabbit_realm:on_node_down(Node),
ok = rabbit_amqqueue:on_node_down(Node),
{noreply, State};
handle_info(_Info, State) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 38349a1c..ce26c11a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -59,6 +59,7 @@
%% all states, unless specified otherwise:
%% socket error -> *exit*
%% socket close -> *throw*
+%% writer send failure -> *throw*
%% forced termination -> *exit*
%% handshake_timeout -> *throw*
%% pre-init:
@@ -93,10 +94,18 @@
%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
-%% channel exit ->
-%% if abnormal exit then log error
-%% if last channel to exit then send connection.close_ok, start
-%% terminate_connection timer, *closing*
+%% channel exit with hard error
+%% -> log error, wait for channels to terminate forcefully, start
+%% terminate_connection timer, send close, *closed*
+%% channel exit with soft error
+%% -> log error, start terminate_channel timer, mark channel as
+%% closing
+%% if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
+%% else *closing*
+%% channel exits normally
+%% -> if last channel to exit then send connection.close_ok,
+%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
%% receive connection.close_ok -> self() ! terminate_connection,
@@ -243,6 +252,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
+ {'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
+ throw(E);
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -302,24 +313,13 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
-handle_dependent_exit(Pid, Reason,
- State = #v1{connection_state = closing}) ->
- case channel_cleanup(Pid) of
- undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel ->
- case Reason of
- normal -> ok;
- _ -> log_channel_error(closing, Channel, Reason)
- end,
- maybe_close(State)
- end;
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
- State;
+ maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
undefined -> exit({abnormal_dependent_exit, Pid, Reason});
- Channel -> handle_exception(State, Channel, Reason)
+ Channel -> maybe_close(handle_exception(State, Channel, Reason))
end.
channel_cleanup(Pid) ->
@@ -376,13 +376,15 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State) ->
+maybe_close(State = #v1{connection_state = closing}) ->
case all_channels() of
[] -> ok = send_on_channel0(
State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State
- end.
+ end;
+maybe_close(State) ->
+ State.
handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
diff --git a/src/rabbit_realm.erl b/src/rabbit_realm.erl
deleted file mode 100644
index 4463954d..00000000
--- a/src/rabbit_realm.erl
+++ /dev/null
@@ -1,316 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd.,
-%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd., Cohesive Financial Technologies
-%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
-%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%% Technologies Ltd.;
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_realm).
-
--export([recover/0]).
--export([add_realm/1, delete_realm/1, list_vhost_realms/1]).
--export([add/2, delete/2, check/2, delete_from_all/1]).
--export([access_request/3, enter_realm/3, leave_realms/1]).
--export([on_node_down/1]).
-
--include("rabbit.hrl").
--include_lib("stdlib/include/qlc.hrl").
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(e_or_q() :: 'exchange' | 'queue').
-
--spec(recover/0 :: () -> 'ok').
--spec(add_realm/1 :: (realm_name()) -> 'ok').
--spec(delete_realm/1 :: (realm_name()) -> 'ok').
--spec(list_vhost_realms/1 :: (vhost()) -> [name()]).
--spec(add/2 :: (realm_name(), r(e_or_q())) -> 'ok').
--spec(delete/2 :: (realm_name(), r(e_or_q())) -> 'ok').
--spec(check/2 :: (realm_name(), r(e_or_q())) -> bool() | not_found()).
--spec(delete_from_all/1 :: (r(e_or_q())) -> 'ok').
--spec(access_request/3 :: (username(), bool(), ticket()) ->
- 'ok' | not_found() | {'error', 'bad_realm_path' |
- 'access_refused' |
- 'resource_locked'}).
--spec(enter_realm/3 :: (realm_name(), bool(), pid()) ->
- 'ok' | {'error', 'resource_locked'}).
--spec(leave_realms/1 :: (pid()) -> 'ok').
--spec(on_node_down/1 :: (node()) -> 'ok').
-
--endif.
-
-%%--------------------------------------------------------------------
-
-recover() ->
- %% preens resource lists, limiting them to currently-extant resources
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- Realms = mnesia:foldl(fun preen_realm/2, [], realm),
- lists:foreach(fun mnesia:write/1, Realms),
- ok
- end).
-
-add_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () ->
- case mnesia:read({realm, Name}) of
- [] ->
- NewRealm = #realm{name = Name,
- exchanges = ordsets:new(),
- queues = ordsets:new()},
- ok = mnesia:write(NewRealm),
- ok = mnesia:write(
- #vhost_realm{virtual_host = VHostPath,
- realm = Name}),
- ok;
- [_R] ->
- mnesia:abort({realm_already_exists, Name})
- end
- end)).
-
-delete_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) ->
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- rabbit_misc:with_realm(
- Name,
- fun () ->
- ok = mnesia:delete({realm, Name}),
- ok = mnesia:delete_object(
- #vhost_realm{virtual_host = VHostPath,
- realm = Name}),
- lists:foreach(fun mnesia:delete_object/1,
- mnesia:index_read(user_realm, Name,
- #user_realm.realm)),
- ok
- end))).
-
-list_vhost_realms(VHostPath) ->
- [Name ||
- #vhost_realm{realm = #resource{name = Name}} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () -> mnesia:read({vhost_realm, VHostPath}) end))].
-
-add(Name = #resource{kind = realm}, Resource) ->
- internal_update_realm_byname(Name, Resource, fun ordsets:add_element/2).
-
-delete(Name = #resource{kind = realm}, Resource) ->
- internal_update_realm_byname(Name, Resource, fun ordsets:del_element/2).
-
-check(Name = #resource{kind = realm}, Resource = #resource{kind = Kind}) ->
- case rabbit_misc:dirty_read({realm, Name}) of
- {ok, R} ->
- case Kind of
- exchange -> ordsets:is_element(Resource, R#realm.exchanges);
- queue -> ordsets:is_element(Resource, R#realm.queues)
- end;
- Other -> Other
- end.
-
-% Requires a mnesia transaction.
-delete_from_all(Resource = #resource{kind = Kind}) ->
- Realms = mnesia:foldl
- (fun (Realm = #realm{exchanges = E0,
- queues = Q0},
- Acc) ->
- IsMember = lists:member(Resource,
- case Kind of
- exchange -> E0;
- queue -> Q0
- end),
- if
- IsMember ->
- [internal_update_realm_record(
- Realm, Resource,
- fun ordsets:del_element/2)
- | Acc];
- true ->
- Acc
- end
- end, [], realm),
- lists:foreach(fun mnesia:write/1, Realms),
- ok.
-
-access_request(Username, Exclusive, Ticket = #ticket{realm_name = RealmName})
- when is_binary(Username) ->
- %% FIXME: We should do this all in a single tx. Otherwise we may
- %% a) get weird answers, b) create inconsistencies in the db
- %% (e.g. realm_visitor records referring to non-existing realms).
- case check_and_lookup(RealmName) of
- {error, Reason} ->
- {error, Reason};
- {ok, _Realm} ->
- {ok, U} = rabbit_access_control:lookup_user(Username),
- case rabbit_access_control:lookup_realm_access(U, RealmName) of
- none ->
- {error, access_refused};
- TicketPattern ->
- case match_ticket(TicketPattern, Ticket) of
- no_match ->
- {error, access_refused};
- match ->
- enter_realm(RealmName, Exclusive, self())
- end
- end
- end.
-
-enter_realm(Name = #resource{kind = realm}, IsExclusive, Pid) ->
- RealmVisitor = #realm_visitor{realm = Name, pid = Pid},
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({exclusive_realm_visitor, Name}) of
- [] when IsExclusive ->
- ok = mnesia:delete_object(RealmVisitor),
- %% TODO: find a more efficient way of checking
- %% for "no machting results" that doesn't
- %% involve retrieving all the records
- case mnesia:read({realm_visitor, Name}) of
- [] ->
- mnesia:write(
- exclusive_realm_visitor, RealmVisitor, write),
- ok;
- [_|_] ->
- {error, resource_locked}
- end;
- [] ->
- ok = mnesia:write(RealmVisitor),
- ok;
- [RealmVisitor] when IsExclusive -> ok;
- [RealmVisitor] ->
- ok = mnesia:delete({exclusive_realm_visitor, Name}),
- ok = mnesia:write(RealmVisitor),
- ok;
- [_] ->
- {error, resource_locked}
- end
- end).
-
-leave_realms(Pid) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:index_read(exclusive_realm_visitor, Pid,
- #realm_visitor.pid) of
- [] -> ok;
- [R] ->
- ok = mnesia:delete_object(
- exclusive_realm_visitor, R, write)
- end,
- lists:foreach(fun mnesia:delete_object/1,
- mnesia:index_read(realm_visitor, Pid,
- #realm_visitor.pid)),
- ok
- end).
-
-on_node_down(Node) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(
- fun (T) -> ok = remove_visitors(Node, T) end,
- [exclusive_realm_visitor, realm_visitor]),
- ok
- end).
-
-%%--------------------------------------------------------------------
-
-preen_realm(Realm = #realm{name = #resource{kind = realm},
- exchanges = E0,
- queues = Q0},
- Realms) ->
- [Realm#realm{exchanges = filter_out_missing(E0, exchange),
- queues = filter_out_missing(Q0, amqqueue)}
- | Realms].
-
-filter_out_missing(Items, TableName) ->
- ordsets:filter(fun (Item) ->
- case mnesia:read({TableName, Item}) of
- [] -> false;
- _ -> true
- end
- end, Items).
-
-internal_update_realm_byname(Name, Resource, SetUpdater) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({realm, Name}) of
- [] ->
- mnesia:abort(not_found);
- [R] ->
- ok = mnesia:write(internal_update_realm_record
- (R, Resource, SetUpdater))
- end
- end).
-
-internal_update_realm_record(R = #realm{exchanges = E0, queues = Q0},
- Resource = #resource{kind = Kind},
- SetUpdater) ->
- case Kind of
- exchange -> R#realm{exchanges = SetUpdater(Resource, E0)};
- queue -> R#realm{queues = SetUpdater(Resource, Q0)}
- end.
-
-check_and_lookup(RealmName = #resource{kind = realm,
- name = <<"/data", _/binary>>}) ->
- lookup(RealmName);
-check_and_lookup(RealmName = #resource{kind = realm,
- name = <<"/admin", _/binary>>}) ->
- lookup(RealmName);
-check_and_lookup(_) ->
- {error, bad_realm_path}.
-
-lookup(Name = #resource{kind = realm}) ->
- rabbit_misc:dirty_read({realm, Name}).
-
-match_ticket(#ticket{passive_flag = PP,
- active_flag = PA,
- write_flag = PW,
- read_flag = PR},
- #ticket{passive_flag = TP,
- active_flag = TA,
- write_flag = TW,
- read_flag = TR}) ->
- if
- %% Matches if either we're not requesting passive access, or
- %% passive access is permitted, and ...
- (not(TP) orelse PP) andalso
- (not(TA) orelse PA) andalso
- (not(TW) orelse PW) andalso
- (not(TR) orelse PR) ->
- match;
- true ->
- no_match
- end.
-
-remove_visitors(Node, T) ->
- qlc:fold(
- fun (R, Acc) ->
- ok = mnesia:delete_object(T, R, write),
- Acc
- end,
- ok,
- qlc:q([R || R = #realm_visitor{pid = Pid} <- mnesia:table(T),
- node(Pid) == Node])).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 41a8d64c..a2337647 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
fun (QPid, {Routed, Handled}) ->
case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate,
Txn, Message, QPid) of
- true -> {true, [QPid | Handled]};
- false -> {true, Handled};
- {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n",
- [QPid, Reason]),
- {Routed, Handled}
+ true -> {true, [QPid | Handled]};
+ false -> {true, Handled};
+ {'EXIT', _Reason} -> {Routed, Handled}
end
end,
{false, []},
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
new file mode 100644
index 00000000..3374d63d
--- /dev/null
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -0,0 +1,86 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial Technologies
+%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
+%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_sasl_report_file_h).
+
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+
+%% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h
+%% module because the original's init/1 does not match properly
+%% with the result of closing the old handler when swapping handlers.
+%% The first init/1 additionally allows for simple log rotation
+%% when the suffix is not the empty string.
+
+%% Used only when swapping handlers and performing
+%% log rotation
+init({{File, Suffix}, []}) ->
+ case rabbit_misc:append_file(File, Suffix) of
+ ok -> ok;
+ {error, Error} ->
+ rabbit_log:error("Failed to append contents of " ++
+ "sasl log file '~s' to '~s':~n~p~n",
+ [File, [File, Suffix], Error])
+ end,
+ init(File);
+%% Used only when swapping handlers and the original handler
+%% failed to terminate or was never installed
+init({{File, _}, error}) ->
+ init(File);
+%% Used only when swapping handlers without
+%% doing any log rotation
+init({File, []}) ->
+ init(File);
+init({_File, _Type} = FileInfo) ->
+ sasl_report_file_h:init(FileInfo);
+init(File) ->
+ sasl_report_file_h:init({File, sasl_error_logger_type()}).
+
+handle_event(Event, State) ->
+ sasl_report_file_h:handle_event(Event, State).
+
+handle_info(Event, State) ->
+ sasl_report_file_h:handle_info(Event, State).
+
+handle_call(Event, State) ->
+ sasl_report_file_h:handle_call(Event, State).
+
+terminate(Reason, State) ->
+ sasl_report_file_h:terminate(Reason, State).
+
+code_change(OldVsn, State, Extra) ->
+ sasl_report_file_h:code_change(OldVsn, State, Extra).
+
+%%----------------------------------------------------------------------
+
+sasl_error_logger_type() ->
+ case application:get_env(sasl, errlog_type) of
+ {ok, error} -> error;
+ {ok, progress} -> progress;
+ {ok, all} -> all;
+ {ok, Bad} -> throw({error, {wrong_errlog_type, Bad}});
+ _ -> all
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index beeb3508..fff02d73 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -29,6 +29,8 @@
-import(lists).
+-include_lib("kernel/include/file.hrl").
+
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
Values = [element(2, E) || E <- Datum],
@@ -38,7 +40,9 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_parsing(),
passed = test_topic_matching(),
+ passed = test_log_management(),
passed = test_app_management(),
+ passed = test_log_management_during_startup(),
passed = test_cluster_management(),
passed = test_user_management(),
passed.
@@ -136,6 +140,134 @@ test_app_management() ->
ok = control_action(status, []),
passed.
+test_log_management() ->
+ MainLog = rabbit:log_location(kernel),
+ SaslLog = rabbit:log_location(sasl),
+ Suffix = ".1",
+
+ %% prepare basic logs
+ file:delete([MainLog, Suffix]),
+ file:delete([SaslLog, Suffix]),
+
+ %% simple logs reopening
+ ok = control_action(rotate_logs, []),
+ [true, true] = empty_files([MainLog, SaslLog]),
+ ok = test_logs_working(MainLog, SaslLog),
+
+ %% simple log rotation
+ ok = control_action(rotate_logs, [Suffix]),
+ [true, true] = non_empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
+ [true, true] = empty_files([MainLog, SaslLog]),
+ ok = test_logs_working(MainLog, SaslLog),
+
+ %% reopening logs with log rotation performed first
+ ok = clean_logs([MainLog, SaslLog], Suffix),
+ ok = control_action(rotate_logs, []),
+ ok = file:rename(MainLog, [MainLog, Suffix]),
+ ok = file:rename(SaslLog, [SaslLog, Suffix]),
+ ok = test_logs_working([MainLog, Suffix], [SaslLog, Suffix]),
+ ok = control_action(rotate_logs, []),
+ ok = test_logs_working(MainLog, SaslLog),
+
+ %% log rotation on empty file
+ ok = clean_logs([MainLog, SaslLog], Suffix),
+ ok = control_action(rotate_logs, []),
+ ok = control_action(rotate_logs, [Suffix]),
+ [true, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
+
+ %% original main log file is not writable
+ ok = make_files_non_writable([MainLog]),
+ {error, {cannot_rotate_main_logs, _}} = control_action(rotate_logs, []),
+ ok = clean_logs([MainLog], Suffix),
+ ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}]),
+
+ %% original sasl log file is not writable
+ ok = make_files_non_writable([SaslLog]),
+ {error, {cannot_rotate_sasl_logs, _}} = control_action(rotate_logs, []),
+ ok = clean_logs([SaslLog], Suffix),
+ ok = add_log_handlers([{rabbit_sasl_report_file_h, SaslLog}]),
+
+ %% logs with suffix are not writable
+ ok = control_action(rotate_logs, [Suffix]),
+ ok = make_files_non_writable([[MainLog, Suffix], [SaslLog, Suffix]]),
+ ok = control_action(rotate_logs, [Suffix]),
+ ok = test_logs_working(MainLog, SaslLog),
+
+ %% original log files are not writable
+ ok = make_files_non_writable([MainLog, SaslLog]),
+ {error, {{cannot_rotate_main_logs, _},
+ {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+
+ %% logging directed to tty (handlers were removed in last test)
+ ok = clean_logs([MainLog, SaslLog], Suffix),
+ ok = application:set_env(sasl, sasl_error_logger, tty),
+ ok = application:set_env(kernel, error_logger, tty),
+ ok = control_action(rotate_logs, []),
+ [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
+
+ %% rotate logs when logging is turned off
+ ok = application:set_env(sasl, sasl_error_logger, false),
+ ok = application:set_env(kernel, error_logger, silent),
+ ok = control_action(rotate_logs, []),
+ [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
+
+ %% cleanup
+ ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
+ ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
+ {rabbit_sasl_report_file_h, SaslLog}]),
+ passed.
+
+test_log_management_during_startup() ->
+ MainLog = rabbit:log_location(kernel),
+ SaslLog = rabbit:log_location(sasl),
+
+ %% start application with simple tty logging
+ ok = control_action(stop_app, []),
+ ok = application:set_env(kernel, error_logger, tty),
+ ok = application:set_env(sasl, sasl_error_logger, tty),
+ ok = add_log_handlers([{error_logger_tty_h, []},
+ {sasl_report_tty_h, []}]),
+ ok = control_action(start_app, []),
+
+ %% start application with tty logging and
+ %% proper handlers not installed
+ ok = control_action(stop_app, []),
+ ok = error_logger:tty(false),
+ ok = delete_log_handlers([sasl_report_tty_h]),
+ ok = case catch control_action(start_app, []) of
+ ok -> exit(got_success_but_expected_failure);
+ {error, {cannot_log_to_tty, _, _}} -> ok
+ end,
+
+ %% fix sasl logging
+ ok = application:set_env(sasl, sasl_error_logger,
+ {file, SaslLog}),
+
+ %% start application with logging to invalid directory
+ TmpLog = "/tmp/rabbit-tests/test.log",
+ file:delete(TmpLog),
+ ok = application:set_env(kernel, error_logger, {file, TmpLog}),
+
+ ok = delete_log_handlers([rabbit_error_logger_file_h]),
+ ok = add_log_handlers([{error_logger_file_h, MainLog}]),
+ ok = case catch control_action(start_app, []) of
+ ok -> exit(got_success_but_expected_failure);
+ {error, {cannot_log_to_file, _, _}} -> ok
+ end,
+
+ %% start application with standard error_logger_file_h
+ %% handler not installed
+ ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
+ %% start application with standard sasl handler not installed
+ %% and rabbit main log handler installed correctly
+ ok = delete_log_handlers([rabbit_sasl_report_file_h]),
+ ok = control_action(start_app, []),
+ passed.
+
test_cluster_management() ->
%% 'cluster' and 'reset' should only work if the app is stopped
@@ -203,7 +335,6 @@ test_cluster_management() ->
end,
ok = control_action(start_app, []),
-
passed.
test_cluster_management2(SecondaryNode) ->
@@ -284,31 +415,12 @@ test_user_management() ->
control_action(unmap_user_vhost, ["foo", "/"]),
{error, {no_such_user, _}} =
control_action(list_user_vhosts, ["foo"]),
- {error, {no_such_user, _}} =
- control_action(set_permissions, ["foo", "/", "/data"]),
- {error, {no_such_user, _}} =
- control_action(list_permissions, ["foo", "/"]),
{error, {no_such_vhost, _}} =
control_action(map_user_vhost, ["guest", "/testhost"]),
{error, {no_such_vhost, _}} =
control_action(unmap_user_vhost, ["guest", "/testhost"]),
{error, {no_such_vhost, _}} =
control_action(list_vhost_users, ["/testhost"]),
- {error, {no_such_vhost, _}} =
- control_action(set_permissions, ["guest", "/testhost", "/data"]),
- {error, {no_such_vhost, _}} =
- control_action(list_permissions, ["guest", "/testhost"]),
- {error, {no_such_vhost, _}} =
- control_action(add_realm, ["/testhost", "/data/test"]),
- {error, {no_such_vhost, _}} =
- control_action(delete_realm, ["/testhost", "/data/test"]),
- {error, {no_such_vhost, _}} =
- control_action(list_realms, ["/testhost"]),
- {error, {no_such_realm, _}} =
- control_action(set_permissions, ["guest", "/", "/data/test"]),
- {error, {no_such_realm, _}} =
- control_action(delete_realm, ["/", "/data/test"]),
-
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
@@ -327,32 +439,6 @@ test_user_management() ->
ok = control_action(map_user_vhost, ["foo", "/testhost"]),
ok = control_action(list_user_vhosts, ["foo"]),
- %% realm creation
- ok = control_action(add_realm, ["/testhost", "/data/test"]),
- {error, {realm_already_exists, _}} =
- control_action(add_realm, ["/testhost", "/data/test"]),
- ok = control_action(list_realms, ["/testhost"]),
-
- %% user permissions
- ok = control_action(set_permissions,
- ["foo", "/testhost", "/data/test",
- "passive", "active", "write", "read"]),
- ok = control_action(list_permissions, ["foo", "/testhost"]),
- ok = control_action(set_permissions,
- ["foo", "/testhost", "/data/test", "all"]),
- ok = control_action(set_permissions,
- ["foo", "/testhost", "/data/test"]),
- {error, not_mapped_to_vhost} =
- control_action(set_permissions,
- ["guest", "/testhost", "/data/test"]),
- {error, not_mapped_to_vhost} =
- control_action(list_permissions, ["guest", "/testhost"]),
-
- %% realm deletion
- ok = control_action(delete_realm, ["/testhost", "/data/test"]),
- {error, {no_such_realm, _}} =
- control_action(delete_realm, ["/testhost", "/data/test"]),
-
%% user/vhost unmapping
ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
@@ -364,13 +450,7 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(add_realm, ["/testhost", "/data/test"]),
ok = control_action(map_user_vhost, ["foo", "/testhost"]),
- ok = control_action(set_permissions,
- ["foo", "/testhost", "/data/test", "all"]),
- _ = rabbit_amqqueue:declare(
- rabbit_misc:r(<<"/testhost">>, realm, <<"/data/test">>),
- <<"bar">>, true, false, []),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion
@@ -380,6 +460,8 @@ test_user_management() ->
passed.
+%---------------------------------------------------------------------
+
control_action(Command, Args) -> control_action(Command, node(), Args).
control_action(Command, Node, Args) ->
@@ -391,3 +473,52 @@ control_action(Command, Node, Args) ->
io:format("failed.~n"),
Other
end.
+
+empty_files(Files) ->
+ [case file:read_file_info(File) of
+ {ok, FInfo} -> FInfo#file_info.size == 0;
+ Error -> Error
+ end || File <- Files].
+
+non_empty_files(Files) ->
+ [case EmptyFile of
+ {error, Reason} -> {error, Reason};
+ _ -> not(EmptyFile)
+ end || EmptyFile <- empty_files(Files)].
+
+test_logs_working(MainLogFile, SaslLogFile) ->
+ ok = rabbit_log:error("foo bar"),
+ ok = error_logger:error_report(crash_report, [foo, bar]),
+ %% give the error loggers some time to catch up
+ timer:sleep(50),
+ [true, true] = non_empty_files([MainLogFile, SaslLogFile]),
+ ok.
+
+clean_logs(Files, Suffix) ->
+ [begin
+ ok = delete_file(File),
+ ok = delete_file([File, Suffix])
+ end || File <- Files],
+ ok.
+
+delete_file(File) ->
+ case file:delete(File) of
+ ok -> ok;
+ {error, enoent} -> ok;
+ Error -> Error
+ end.
+
+make_files_non_writable(Files) ->
+ [ok = file:write_file_info(File, #file_info{mode=0}) ||
+ File <- Files],
+ ok.
+
+add_log_handlers(Handlers) ->
+ [ok = error_logger:add_report_handler(Handler, Args) ||
+ {Handler, Args} <- Handlers],
+ ok.
+
+delete_log_handlers(Handlers) ->
+ [[] = error_logger:delete_report_handler(Handler) ||
+ Handler <- Handlers],
+ ok.
diff --git a/src/rabbit_ticket.erl b/src/rabbit_ticket.erl
deleted file mode 100644
index 3a608faa..00000000
--- a/src/rabbit_ticket.erl
+++ /dev/null
@@ -1,131 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd.,
-%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd., Cohesive Financial Technologies
-%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008
-%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
-%% Technologies Ltd.;
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_ticket).
--include("rabbit.hrl").
-
--export([record_ticket/2, lookup_ticket/4, check_ticket/4]).
-
--import(application).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(ticket_number() :: non_neg_integer()).
-%% we'd like to write #ticket.passive_flag | #ticket.active_flag | ...
-%% but dialyzer doesn't support that.
--type(ticket_field() :: 3..6).
-
--spec(record_ticket/2 :: (ticket_number(), ticket()) -> 'ok').
--spec(lookup_ticket/4 ::
- (ticket_number(), ticket_field(), username(), vhost()) ->
- ticket()).
--spec(check_ticket/4 ::
- (ticket_number(), ticket_field(), r('exchange' | 'queue'), username()) ->
- 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-record_ticket(TicketNumber, Ticket) ->
- put({ticket, TicketNumber}, Ticket),
- ok.
-
-lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath) ->
- case get({ticket, TicketNumber}) of
- undefined ->
- %% Spec: "The server MUST isolate access tickets per
- %% channel and treat an attempt by a client to mix these
- %% as a connection exception."
- rabbit_log:warning("Attempt by client to use invalid ticket ~p~n", [TicketNumber]),
- maybe_relax_checks(TicketNumber, Username, VHostPath);
- Ticket = #ticket{} ->
- case element(FieldIndex, Ticket) of
- false -> rabbit_misc:protocol_error(
- access_refused,
- "ticket ~w has insufficient permissions",
- [TicketNumber]);
- true -> Ticket
- end
- end.
-
-maybe_relax_checks(TicketNumber, Username, VHostPath) ->
- case rabbit_misc:strict_ticket_checking() of
- true ->
- rabbit_misc:protocol_error(
- access_refused, "invalid ticket ~w", [TicketNumber]);
- false ->
- rabbit_log:warning("Lax ticket check mode: fabricating full ticket ~p for user ~p, vhost ~p~n",
- [TicketNumber, Username, VHostPath]),
- Ticket = rabbit_access_control:full_ticket(
- rabbit_misc:r(VHostPath, realm, <<"/data">>)),
- case rabbit_realm:access_request(Username, false, Ticket) of
- ok -> record_ticket(TicketNumber, Ticket),
- Ticket;
- {error, Reason} ->
- rabbit_misc:protocol_error(
- Reason,
- "fabrication of ticket ~w for user '~s' in vhost '~s' failed",
- [TicketNumber, Username, VHostPath])
- end
- end.
-
-check_ticket(TicketNumber, FieldIndex,
- Name = #resource{virtual_host = VHostPath}, Username) ->
- #ticket{realm_name = RealmName} =
- lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath),
- case resource_in_realm(RealmName, Name) of
- false ->
- case rabbit_misc:strict_ticket_checking() of
- true ->
- rabbit_misc:protocol_error(
- access_refused,
- "insufficient permissions in ticket ~w to access ~s in ~s",
- [TicketNumber, rabbit_misc:rs(Name),
- rabbit_misc:rs(RealmName)]);
- false ->
- rabbit_log:warning("Lax ticket check mode: ignoring cross-realm access for ticket ~p~n", [TicketNumber]),
- ok
- end;
- true ->
- ok
- end.
-
-resource_in_realm(RealmName, ResourceName = #resource{kind = Kind}) ->
- CacheKey = {resource_cache, RealmName, Kind},
- case get(CacheKey) of
- Name when Name == ResourceName ->
- true;
- _ ->
- case rabbit_realm:check(RealmName, ResourceName) of
- true ->
- put(CacheKey, ResourceName),
- true;
- _ ->
- false
- end
- end.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index c3c7db53..2c7fa2ab 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -157,10 +157,15 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)),
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
ok.
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
- true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
+ Content, FrameMax)),
ok.
+
+port_cmd(Sock, Data) ->
+ try erlang:port_command(Sock, Data)
+ catch error:Error -> exit({writer, send_failed, Error})
+ end.