diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 167 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 124 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 97 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 178 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 82 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_error_logger_file_h.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 100 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_realm.erl | 316 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_sasl_report_file_h.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 235 | ||||
| -rw-r--r-- | src/rabbit_ticket.erl | 131 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 11 |
19 files changed, 696 insertions, 1082 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index e65d532b..c6ef1749 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -27,10 +27,12 @@ -behaviour(application). --export([start/0, stop/0, stop_and_halt/0, status/0]). +-export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). -export([start/2, stop/1]). +-export([log_location/1]). + -import(application). -import(mnesia). -import(lists). @@ -46,13 +48,18 @@ -ifdef(use_specs). +-type(log_location() :: 'tty' | 'undefined' | string()). +-type(file_suffix() :: binary()). + -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). +-spec(rotate_logs/1 :: (file_suffix()) -> 'ok' | {'error', any()}). -spec(status/0 :: () -> [{running_applications, [{atom(), string(), string()}]} | {nodes, [node()]} | {running_nodes, [node()]}]). +-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). -endif. @@ -60,7 +67,7 @@ start() -> try - ok = ensure_working_log_config(), + ok = ensure_working_log_handlers(), ok = rabbit_mnesia:ensure_mnesia_dir(), ok = start_applications(?APPS) after @@ -85,6 +92,15 @@ status() -> [{running_applications, application:which_applications()}] ++ rabbit_mnesia:status(). +rotate_logs(BinarySuffix) -> + Suffix = binary_to_list(BinarySuffix), + log_rotation_result(rotate_logs(log_location(kernel), + Suffix, + rabbit_error_logger_file_h), + rotate_logs(log_location(sasl), + Suffix, + rabbit_sasl_report_file_h)). + %%-------------------------------------------------------------------- manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> @@ -98,7 +114,7 @@ manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) -> end end, [], Apps), ok. - + start_applications(Apps) -> manage_applications(fun lists:foldl/3, fun application:start/1, @@ -128,9 +144,9 @@ start(normal, []) -> io:format("starting ~-20s ...", [Msg]), Thunk(), io:format("done~n"); - ({Msg, M, F, A}) -> + ({Msg, M, F, A}) -> io:format("starting ~-20s ...", [Msg]), - apply(M, F, A), + apply(M, F, A), io:format("done~n") end, [{"database", @@ -150,14 +166,12 @@ start(normal, []) -> {"recovery", fun () -> ok = maybe_insert_default_data(), - ok = rabbit_exchange:recover(), - ok = rabbit_amqqueue:recover(), - ok = rabbit_realm:recover() + ok = rabbit_amqqueue:recover() end}, {"persister", - fun () -> - ok = start_child(rabbit_persister) + fun () -> + ok = start_child(rabbit_persister) end}, {"builtin applications", fun () -> @@ -188,6 +202,21 @@ stop(_State) -> %--------------------------------------------------------------------------- +log_location(Type) -> + case application:get_env(Type, case Type of + kernel -> error_logger; + sasl -> sasl_error_logger + end) of + {ok, {file, File}} -> File; + {ok, false} -> undefined; + {ok, tty} -> tty; + {ok, silent} -> undefined; + {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); + _ -> undefined + end. + +%--------------------------------------------------------------------------- + print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), @@ -196,7 +225,9 @@ print_banner() -> ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), io:format("Logging to ~p~nSASL logging to ~p~n~n", - [error_log_location(), sasl_log_location()]). + [log_location(kernel), log_location(sasl)]). + + start_child(Mod) -> {ok,_} = supervisor:start_child(rabbit_sup, @@ -204,6 +235,43 @@ start_child(Mod) -> transient, 100, worker, [Mod]}), ok. +ensure_working_log_handlers() -> + Handlers = gen_event:which_handlers(error_logger), + ok = ensure_working_log_handler(error_logger_file_h, + rabbit_error_logger_file_h, + error_logger_tty_h, + log_location(kernel), + Handlers), + + ok = ensure_working_log_handler(sasl_report_file_h, + rabbit_sasl_report_file_h, + sasl_report_tty_h, + log_location(sasl), + Handlers), + ok. + +ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler, + LogLocation, Handlers) -> + case LogLocation of + undefined -> ok; + tty -> case lists:member(TTYHandler, Handlers) of + true -> ok; + false -> + throw({error, {cannot_log_to_tty, + TTYHandler, not_installed}}) + end; + _ -> case lists:member(NewFHandler, Handlers) of + true -> ok; + false -> case rotate_logs(LogLocation, "", + OldFHandler, NewFHandler) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_log_to_file, + LogLocation, Reason}}) + end + end + end. + maybe_insert_default_data() -> case rabbit_mnesia:is_db_empty() of true -> insert_default_data(); @@ -215,26 +283,8 @@ insert_default_data() -> {ok, DefaultPass} = application:get_env(default_pass), {ok, DefaultVHost} = application:get_env(default_vhost), ok = rabbit_access_control:add_vhost(DefaultVHost), - ok = insert_default_user(DefaultUser, DefaultPass, - [{DefaultVHost, [<<"/data">>, <<"/admin">>]}]), - ok. - -insert_default_user(Username, Password, VHostSpecs) -> - ok = rabbit_access_control:add_user(Username, Password), - lists:foreach( - fun ({VHostPath, Realms}) -> - ok = rabbit_access_control:map_user_vhost( - Username, VHostPath), - lists:foreach( - fun (Realm) -> - RealmFullName = - rabbit_misc:r(VHostPath, realm, Realm), - ok = rabbit_access_control:map_user_realm( - Username, - rabbit_access_control:full_ticket( - RealmFullName)) - end, Realms) - end, VHostSpecs), + ok = rabbit_access_control:add_user(DefaultUser, DefaultPass), + ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost), ok. start_builtin_amq_applications() -> @@ -243,40 +293,25 @@ start_builtin_amq_applications() -> %%restart ok. -ensure_working_log_config() -> - case error_logger:logfile(filename) of - {error, no_log_file} -> - %% either no log file was configured or opening it failed. - case application:get_env(kernel, error_logger) of - {ok, {file, Filename}} -> - case filelib:ensure_dir(Filename) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_log_to_file, - Filename, Reason1}}) - end, - case error_logger:logfile({open, Filename}) of - ok -> ok; - {error, Reason2} -> - throw({error, {cannot_log_to_file, - Filename, Reason2}}) - end; - _ -> ok - end; - _Filename -> ok - end. - -error_log_location() -> - case error_logger:logfile(filename) of - {error,no_log_file} -> tty; - File -> File +rotate_logs(File, Suffix, Handler) -> + rotate_logs(File, Suffix, Handler, Handler). + +rotate_logs(File, Suffix, OldHandler, NewHandler) -> + case File of + undefined -> ok; + tty -> ok; + _ -> gen_event:swap_handler( + error_logger, + {OldHandler, swap}, + {NewHandler, {File, Suffix}}) end. -sasl_log_location() -> - case application:get_env(sasl, sasl_error_logger) of - {ok, {file, File}} -> File; - {ok, false} -> undefined; - {ok, tty} -> tty; - {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); - _ -> undefined - end. +log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> + {error, {{cannot_rotate_main_logs, MainLogError}, + {cannot_rotate_sasl_logs, SaslLogError}}}; +log_rotation_result({error, MainLogError}, ok) -> + {error, {cannot_rotate_main_logs, MainLogError}}; +log_rotation_result(ok, {error, SaslLogError}) -> + {error, {cannot_rotate_sasl_logs, SaslLogError}}; +log_rotation_result(ok, ok) -> + ok. diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 2be07b19..4342e15b 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -28,12 +28,11 @@ -include("rabbit.hrl"). -export([check_login/2, user_pass_login/2, - check_vhost_access/2, lookup_realm_access/2]). + check_vhost_access/2]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). -export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). -export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). --export([list_user_realms/2, map_user_realm/2, full_ticket/1]). %%---------------------------------------------------------------------------- @@ -42,7 +41,6 @@ -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). --spec(lookup_realm_access/2 :: (user(), realm_name()) -> maybe(ticket())). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -55,9 +53,6 @@ -spec(list_user_vhosts/1 :: (username()) -> [vhost()]). -spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). -spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). --spec(map_user_realm/2 :: (username(), ticket()) -> 'ok'). --spec(list_user_realms/2 :: (username(), vhost()) -> [{name(), ticket()}]). --spec(full_ticket/1 :: (realm_name()) -> ticket()). -endif. @@ -87,7 +82,7 @@ check_login(<<"AMQPLAIN">>, Response) -> [LoginTable]) end; -check_login(Mechanism, _Response) -> +check_login(Mechanism, _Response) -> rabbit_misc:protocol_error( access_refused, "unsupported authentication mechanism '~s'", [Mechanism]). @@ -130,18 +125,6 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. -lookup_realm_access(#user{username = Username}, RealmName = #resource{kind = realm}) -> - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - fun () -> - case user_realms(Username, RealmName) of - [] -> - none; - [#user_realm{ticket_pattern = TicketPattern}] -> - TicketPattern - end - end). - add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -162,8 +145,7 @@ delete_user(Username) -> Username, fun () -> ok = mnesia:delete({user, Username}), - ok = mnesia:delete({user_vhost, Username}), - ok = mnesia:delete({user_realm, Username}) + ok = mnesia:delete({user_vhost, Username}) end)), rabbit_log:info("Deleted user ~p~n", [Username]), R. @@ -191,24 +173,14 @@ add_vhost(VHostPath) -> case mnesia:read({vhost, VHostPath}) of [] -> ok = mnesia:write(#vhost{virtual_host = VHostPath}), - DataRealm = - rabbit_misc:r(VHostPath, realm, <<"/data">>), - AdminRealm = - rabbit_misc:r(VHostPath, realm, <<"/admin">>), - ok = rabbit_realm:add_realm(DataRealm), - ok = rabbit_realm:add_realm(AdminRealm), - #exchange{} = rabbit_exchange:declare( - DataRealm, <<"">>, - direct, true, false, []), - #exchange{} = rabbit_exchange:declare( - DataRealm, <<"amq.direct">>, - direct, true, false, []), - #exchange{} = rabbit_exchange:declare( - DataRealm, <<"amq.topic">>, - topic, true, false, []), - #exchange{} = rabbit_exchange:declare( - DataRealm, <<"amq.fanout">>, - fanout, true, false, []), + [rabbit_exchange:declare( + rabbit_misc:r(VHostPath, exchange, Name), + Type, true, false, []) || + {Name,Type} <- + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.fanout">>, fanout}]], ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) @@ -240,11 +212,6 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list_vhost_exchanges(VHostPath)), - lists:foreach(fun (RealmName) -> - ok = rabbit_realm:delete_realm( - rabbit_misc:r(VHostPath, realm, RealmName)) - end, - rabbit_realm:list_vhost_realms(VHostPath)), lists:foreach(fun (Username) -> ok = unmap_user_vhost(Username, VHostPath) end, @@ -290,77 +257,8 @@ unmap_user_vhost(Username, VHostPath) -> rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - lists:foreach(fun mnesia:delete_object/1, - user_realms(Username, - rabbit_misc:r(VHostPath, realm))), ok = mnesia:delete_object( #user_vhost{username = Username, virtual_host = VHostPath}) end)). -map_user_realm(Username, - Ticket = #ticket{realm_name = RealmName = - #resource{virtual_host = VHostPath, - kind = realm}}) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - rabbit_misc:with_realm( - RealmName, - fun () -> - lists:foreach(fun mnesia:delete_object/1, - user_realms(Username, RealmName)), - case internal_lookup_vhost_access(Username, VHostPath) of - {ok, _R} -> - case ticket_liveness(Ticket) of - alive -> - ok = mnesia:write( - #user_realm{username = Username, - realm = RealmName, - ticket_pattern = Ticket}); - dead -> - ok - end; - not_found -> - mnesia:abort(not_mapped_to_vhost) - end - end))). - -list_user_realms(Username, VHostPath) -> - [{Name, Pattern} || - #user_realm{realm = #resource{name = Name}, - ticket_pattern = Pattern} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user_and_vhost( - Username, VHostPath, - fun () -> - case internal_lookup_vhost_access( - Username, VHostPath) of - {ok, _R} -> - user_realms(Username, - rabbit_misc:r(VHostPath, realm)); - not_found -> - mnesia:abort(not_mapped_to_vhost) - end - end))]. - -ticket_liveness(#ticket{passive_flag = false, - active_flag = false, - write_flag = false, - read_flag = false}) -> - dead; -ticket_liveness(_) -> - alive. - -full_ticket(RealmName) -> - #ticket{realm_name = RealmName, - passive_flag = true, - active_flag = true, - write_flag = true, - read_flag = true}. - -user_realms(Username, RealmName) -> - mnesia:match_object(#user_realm{username = Username, - realm = RealmName, - _ = '_'}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 63f043ba..bd64f1e4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,15 +25,15 @@ -module(rabbit_amqqueue). --export([start/0, recover/0, declare/5, delete/3, purge/1, internal_delete/1]). --export([pseudo_queue/3]). +-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). +-export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, - commit/2, rollback/2]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_down/2]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). @@ -44,6 +44,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(CALL_TIMEOUT, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -53,9 +55,12 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(bind_res() :: {'ok', non_neg_integer()} | {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: (realm_name(), name(), bool(), bool(), amqp_table()) -> +-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). -spec(add_binding/4 :: (queue_name(), exchange_name(), routing_key(), amqp_table()) -> @@ -81,9 +86,9 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). --spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> @@ -96,7 +101,7 @@ -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(pseudo_queue/3 :: (realm_name(), binary(), pid()) -> amqqueue()). +-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). -endif. @@ -130,9 +135,8 @@ recover_durable_queues() -> ok end). -declare(RealmName, NameBin, Durable, AutoDelete, Args) -> - QName = rabbit_misc:r(RealmName, queue, NameBin), - Q = start_queue_process(#amqqueue{name = QName, +declare(QueueName, Durable, AutoDelete, Args) -> + Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, @@ -140,9 +144,8 @@ declare(RealmName, NameBin, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QName}) of + case mnesia:wread({amqqueue, QueueName}) of [] -> ok = recover_queue(Q), - ok = rabbit_realm:add(RealmName, QName), Q; [ExistingQ] -> ExistingQ end @@ -251,7 +254,7 @@ with(Name, F, E) -> end. with(Name, F) -> - with(Name, F, fun () -> {error, not_found} end). + with(Name, F, fun () -> {error, not_found} end). with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(Name)]) @@ -289,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). -commit(QPid, Txn) -> - gen_server:call(QPid, {commit, Txn}). - -rollback(QPid, Txn) -> - gen_server:cast(QPid, {rollback, Txn}). - -notify_down(#amqqueue{ pid = QPid }, ChPid) -> - gen_server:call(QPid, {notify_down, ChPid}). +commit_all(QPids, Txn) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + QPids). + +rollback_all(QPids, Txn) -> + safe_pmap_ok( + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + QPids). + +notify_down_all(QPids, ChPid) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> + rabbit_misc:with_exit_handler( + %% we don't care if the queue process has terminated + %% in the meantime + fun () -> ok end, + fun () -> gen_server:call(QPid, {notify_down, ChPid}, + Timeout) end) + end, + QPids). binding_forcibly_removed(BindingSpec, QueueName) -> rabbit_misc:execute_mnesia_transaction( @@ -338,28 +356,20 @@ internal_delete(QueueName) -> case mnesia:wread({amqqueue, QueueName}) of [] -> {error, not_found}; [Q] -> - ok = delete_temp(Q), + ok = delete_queue(Q), ok = mnesia:delete({durable_queues, QueueName}), - ok = rabbit_realm:delete_from_all(QueueName), ok end end). -delete_temp(Q = #amqqueue{name = QueueName}) -> +delete_queue(Q = #amqqueue{name = QueueName}) -> ok = delete_bindings(Q), ok = rabbit_exchange:delete_binding( default_binding_spec(QueueName), Q), ok = mnesia:delete({amqqueue, QueueName}), ok. -delete_queue(Q = #amqqueue{name = QueueName, durable = Durable}) -> - ok = delete_temp(Q), - if - Durable -> ok; - true -> ok = rabbit_realm:delete_from_all(QueueName) - end. - -on_node_down(Node) -> +on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( @@ -370,10 +380,23 @@ on_node_down(Node) -> node(Pid) == Node])) end). -pseudo_queue(RealmName, NameBin, Pid) -> - #amqqueue{name = rabbit_misc:r(RealmName, queue, NameBin), +pseudo_queue(QueueName, Pid) -> + #amqqueue{name = QueueName, durable = false, auto_delete = false, arguments = [], binding_specs = [], pid = Pid}. + +safe_pmap_ok(F, L) -> + case [R || R <- rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, L), + R =/= ok] of + [] -> ok; + Errors -> {error, Errors} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ec1d1fba..a9278898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,7 +37,7 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, - most_recently_declared_queue, consumer_mapping, next_ticket}). + most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -94,8 +94,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new(), - next_ticket = 101}. + consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> case (catch handle_method(Method, Content, State)) of @@ -140,7 +139,6 @@ handle_message(Other, State) -> terminate(Reason, State = #ch{writer_pid = WriterPid}) -> Res = notify_queues(internal_rollback(State)), - ok = rabbit_realm:leave_realms(self()), case Reason of normal -> ok = Res; _ -> ok @@ -195,14 +193,6 @@ die_precondition_failed(Fmt, Params) -> rabbit_misc:protocol_error({false, 406, <<"PRECONDITION_FAILED">>}, Fmt, Params). -check_ticket(TicketNumber, FieldIndex, Name, #ch{ username = Username}) -> - rabbit_ticket:check_ticket(TicketNumber, FieldIndex, Name, Username). - -lookup_ticket(TicketNumber, FieldIndex, - #ch{ username = Username, virtual_host = VHostPath }) -> - rabbit_ticket:lookup_ticket(TicketNumber, FieldIndex, - Username, VHostPath). - %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -235,57 +225,19 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), - ok = rabbit_realm:leave_realms(self()), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), ok = rabbit_writer:shutdown(WriterPid), stop; -handle_method(#'access.request'{realm = RealmNameBin, - exclusive = Exclusive, - passive = Passive, - active = Active, - write = Write, - read = Read}, - _, State = #ch{username = Username, - virtual_host = VHostPath, - next_ticket = NextTicket}) -> - RealmName = rabbit_misc:r(VHostPath, realm, RealmNameBin), - Ticket = #ticket{realm_name = RealmName, - passive_flag = Passive, - active_flag = Active, - write_flag = Write, - read_flag = Read}, - case rabbit_realm:access_request(Username, Exclusive, Ticket) of - ok -> - rabbit_ticket:record_ticket(NextTicket, Ticket), - NewState = State#ch{next_ticket = NextTicket + 1}, - {reply, #'access.request_ok'{ticket = NextTicket}, NewState}; - {error, not_found} -> - rabbit_misc:protocol_error( - invalid_path, "no ~s", [rabbit_misc:rs(RealmName)]); - {error, bad_realm_path} -> - %% FIXME: spec bug? access_refused is a soft error, spec requires it to be hard - rabbit_misc:protocol_error( - access_refused, "bad path for ~s", [rabbit_misc:rs(RealmName)]); - {error, resource_locked} -> - rabbit_misc:protocol_error( - resource_locked, "~s is locked", [rabbit_misc:rs(RealmName)]); - {error, access_refused} -> - rabbit_misc:protocol_error( - access_refused, - "~w permissions denied for user '~s' attempting to access ~s", - [rabbit_misc:permission_list(Ticket), - Username, rabbit_misc:rs(RealmName)]) - end; +handle_method(#'access.request'{},_, State) -> + {reply, #'access.request_ok'{ticket = 1}, State}; -handle_method(#'basic.publish'{ticket = TicketNumber, - exchange = ExchangeNameBin, +handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_ticket(TicketNumber, #ticket.write_flag, ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -323,13 +275,11 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, uncommitted_ack_q = NewUAQ}) end}; -handle_method(#'basic.get'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of @@ -352,8 +302,7 @@ handle_method(#'basic.get'{ticket = TicketNumber, {reply, #'basic.get_empty'{cluster_id = <<>>}, State} end; -handle_method(#'basic.consume'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, no_local = _, % FIXME: implement no_ack = NoAck, @@ -365,7 +314,6 @@ handle_method(#'basic.consume'{ticket = TicketNumber, case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_misc:binstring_guid("amq.ctag"); @@ -391,7 +339,7 @@ handle_method(#'basic.consume'{ticket = TicketNumber, ConsumerMapping)}}; {error, queue_owned_by_another_connection} -> %% The spec is silent on which exception to use - %% here. This seems reasonable? + %% here. This seems reasonable? %% FIXME: check this rabbit_misc:protocol_error( @@ -495,8 +443,7 @@ handle_method(#'basic.recover'{}, _, _State) -> rabbit_misc:protocol_error( not_allowed, "attempt to recover a transactional channel",[]); -handle_method(#'exchange.declare'{ticket = TicketNumber, - exchange = ExchangeNameBin, +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, durable = Durable, @@ -505,17 +452,13 @@ handle_method(#'exchange.declare'{ticket = TicketNumber, nowait = NoWait, arguments = Args}, _, State = #ch{ virtual_host = VHostPath }) -> - #ticket{realm_name = RealmName} = - lookup_ticket(TicketNumber, #ticket.active_flag, State), CheckedType = rabbit_exchange:check_type(TypeNameBin), - %% FIXME: clarify spec as per declare wrt differing realms - X = case rabbit_exchange:lookup( - rabbit_misc:r(VHostPath, exchange, ExchangeNameBin)) of + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> - ActualNameBin = check_name('exchange', ExchangeNameBin), - rabbit_exchange:declare(RealmName, - ActualNameBin, + check_name('exchange', ExchangeNameBin), + rabbit_exchange:declare(ExchangeName, CheckedType, Durable, AutoDelete, @@ -524,26 +467,21 @@ handle_method(#'exchange.declare'{ticket = TicketNumber, ok = rabbit_exchange:assert_type(X, CheckedType), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.declare'{ticket = TicketNumber, - exchange = ExchangeNameBin, +handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> - %% FIXME: spec issue: permit active_flag here as well as passive_flag? - #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); -handle_method(#'exchange.delete'{ticket = TicketNumber, - exchange = ExchangeNameBin, +handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), - check_ticket(TicketNumber, #ticket.active_flag, ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -555,8 +493,7 @@ handle_method(#'exchange.delete'{ticket = TicketNumber, return_ok(State, NoWait, #'exchange.delete_ok'{}) end; -handle_method(#'queue.declare'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = Durable, exclusive = ExclusiveDeclare, @@ -565,8 +502,6 @@ handle_method(#'queue.declare'{ticket = TicketNumber, arguments = Args}, _, State = #ch { virtual_host = VHostPath, reader_pid = ReaderPid }) -> - #ticket{realm_name = RealmName} = - lookup_ticket(TicketNumber, #ticket.active_flag, State), %% FIXME: atomic create&claim Finish = fun (Q) -> @@ -587,7 +522,6 @@ handle_method(#'queue.declare'{ticket = TicketNumber, end, Q end, - %% FIXME: clarify spec as per declare wrt differing realms Q = case rabbit_amqqueue:with( rabbit_misc:r(VHostPath, queue, QueueNameBin), Finish) of @@ -597,34 +531,28 @@ handle_method(#'queue.declare'{ticket = TicketNumber, <<>> -> rabbit_misc:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - Finish(rabbit_amqqueue:declare(RealmName, - ActualNameBin, - Durable, - AutoDelete, - Args)); + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + Finish(rabbit_amqqueue:declare(QueueName, + Durable, AutoDelete, Args)); Other -> Other end, return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.declare'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> - #ticket{} = lookup_ticket(TicketNumber, #ticket.passive_flag, State), QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); -handle_method(#'queue.delete'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_ticket(TicketNumber, #ticket.active_flag, QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -640,8 +568,7 @@ handle_method(#'queue.delete'{ticket = TicketNumber, message_count = PurgedMessageCount}) end; -handle_method(#'queue.bind'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'queue.bind'{queue = QueueNameBin, exchange = ExchangeNameBin, routing_key = RoutingKey, nowait = NoWait, @@ -652,14 +579,13 @@ handle_method(#'queue.bind'{ticket = TicketNumber, QueueName = expand_queue_name_shortcut(QueueNameBin, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), - check_ticket(TicketNumber, #ticket.active_flag, QueueName, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), case rabbit_amqqueue:add_binding(QueueName, ExchangeName, ActualRoutingKey, Arguments) of - {error, queue_not_found} -> + {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> + {error, exchange_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, durability_settings_incompatible} -> @@ -670,12 +596,10 @@ handle_method(#'queue.bind'{ticket = TicketNumber, return_ok(State, NoWait, #'queue.bind_ok'{}) end; -handle_method(#'queue.purge'{ticket = TicketNumber, - queue = QueueNameBin, +handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), - check_ticket(TicketNumber, #ticket.read_flag, QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -783,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) -> make_tx_id() -> rabbit_misc:guid(). -safe_pmap_set_ok(F, S) -> - case lists:filter(fun (R) -> R =/= ok end, - rabbit_misc:upmap( - fun (V) -> - try F(V) - catch Class:Reason -> {Class, Reason} - end - end, sets:to_list(S))) of - [] -> ok; - Errors -> {error, Errors} - end. - -notify_participants(F, TxnKey, Participants) -> - safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). - new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), @@ -805,8 +714,8 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> - case notify_participants(fun rabbit_amqqueue:commit/2, - TxnKey, Participants) of + case rabbit_amqqueue:commit_all(sets:to_list(Participants), + TxnKey) of ok -> new_tx(State); {error, Errors} -> exit({commit_failed, Errors}) end. @@ -819,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case notify_participants(fun rabbit_amqqueue:rollback/2, - TxnKey, Participants) of + case rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> exit({rollback_failed, Errors}) @@ -843,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - safe_pmap_set_ok( - fun (QueueName) -> - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:notify_down(Q, ProxyPid) - end) of - ok -> - ok; - {error, not_found} -> - %% queue has been deleted in the meantime - ok - end - end, - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)). + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index ad796b61..bc588279 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -73,6 +73,7 @@ Available commands: force_reset cluster <ClusterNode> ... status + rotate_logs [Suffix] add_user <UserName> <Password> delete_user <UserName> @@ -88,17 +89,6 @@ Available commands: list_user_vhosts <UserName> list_vhost_users <VHostPath> - add_realm <VHostPath> <RealmName> - delete_realm <VHostPath> <RealmName> - list_realms <VHostPath> - - set_permissions <UserName> <VHostPath> <RealmName> [<Permission> ...] - Permissions management. The available permissions are 'passive', - 'active', 'write' and 'read', corresponding to the permissions - referred to in AMQP's \"access.request\" message, or 'all' as an - abbreviation for all defined permission flags. - list_permissions <UserName> <VHostPath> - <node> should be the name of the master node of the RabbitMQ cluster. It defaults to the node named \"rabbit\" on the local host. On a host named \"server.example.com\", the master node will usually be rabbit@server (unless @@ -140,6 +130,13 @@ action(status, Node, []) -> io:format("~n~p~n", [Res]), ok; +action(rotate_logs, Node, []) -> + io:format("Reopening logs for node ~p ...", [Node]), + call(Node, {rabbit, rotate_logs, [""]}); +action(rotate_logs, Node, Args = [Suffix]) -> + io:format("Rotating logs to files with suffix ~p ...", [Suffix]), + call(Node, {rabbit, rotate_logs, Args}); + action(add_user, Node, Args = [Username, _Password]) -> io:format("Creating user ~p ...", [Username]), call(Node, {rabbit_access_control, add_user, Args}); @@ -182,68 +179,7 @@ action(list_user_vhosts, Node, Args = [_Username]) -> action(list_vhost_users, Node, Args = [_VHostPath]) -> io:format("Listing users for vhosts ~p...", Args), - display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})); - -action(add_realm, Node, [VHostPath, RealmName]) -> - io:format("Adding realm ~p to vhost ~p ...", [RealmName, VHostPath]), - rpc_call(Node, rabbit_realm, add_realm, - [realm_rsrc(VHostPath, RealmName)]); - -action(delete_realm, Node, [VHostPath, RealmName]) -> - io:format("Deleting realm ~p from vhost ~p ...", [RealmName, VHostPath]), - rpc_call(Node, rabbit_realm, delete_realm, - [realm_rsrc(VHostPath, RealmName)]); - -action(list_realms, Node, Args = [_VHostPath]) -> - io:format("Listing realms for vhost ~p ...", Args), - display_list(call(Node, {rabbit_realm, list_vhost_realms, Args})); - -action(set_permissions, Node, - [Username, VHostPath, RealmName | Permissions]) -> - io:format("Setting permissions for user ~p, vhost ~p, realm ~p ...", - [Username, VHostPath, RealmName]), - CheckedPermissions = check_permissions(Permissions), - Ticket = #ticket{ - realm_name = realm_rsrc(VHostPath, RealmName), - passive_flag = lists:member(passive, CheckedPermissions), - active_flag = lists:member(active, CheckedPermissions), - write_flag = lists:member(write, CheckedPermissions), - read_flag = lists:member(read, CheckedPermissions)}, - rpc_call(Node, rabbit_access_control, map_user_realm, - [list_to_binary(Username), Ticket]); - -action(list_permissions, Node, Args = [_Username, _VHostPath]) -> - io:format("Listing permissions for user ~p in vhost ~p ...", Args), - Perms = call(Node, {rabbit_access_control, list_user_realms, Args}), - if is_list(Perms) -> - lists:foreach( - fun ({RealmName, Pattern}) -> - io:format("~n~s: ~p", - [binary_to_list(RealmName), - rabbit_misc:permission_list(Pattern)]) - end, - lists:sort(Perms)), - io:nl(), - ok; - true -> Perms - end. - -check_permissions([]) -> []; -check_permissions(["all" | R]) -> - [passive, active, write, read | check_permissions(R)]; -check_permissions([P | R]) when (P == "passive") or - (P == "active") or - (P == "write") or - (P == "read") -> - [list_to_atom(P) | check_permissions(R)]; -check_permissions([P | _R]) -> - io:format("~nError: invalid permission flag ~p~n", [P]), - usage(). - -realm_rsrc(VHostPath, RealmName) -> - rabbit_misc:r(list_to_binary(VHostPath), - realm, - list_to_binary(RealmName)). + display_list(call(Node, {rabbit_access_control, list_vhost_users, Args})). display_list(L) when is_list(L) -> lists:foreach(fun (I) -> diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 0ae116bb..9220d7b4 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -34,10 +34,7 @@ init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( - #resource{virtual_host = DefaultVHost, - kind = realm, - name = <<"/admin">>}, - ?LOG_EXCH_NAME, + rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), topic, true, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl new file mode 100644 index 00000000..d67b02ef --- /dev/null +++ b/src/rabbit_error_logger_file_h.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_error_logger_file_h). + +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). + +%% rabbit_error_logger_file_h is a wrapper around the error_logger_file_h +%% module because the original's init/1 does not match properly +%% with the result of closing the old handler when swapping handlers. +%% The first init/1 additionally allows for simple log rotation +%% when the suffix is not the empty string. + +%% Used only when swapping handlers in log rotation +init({{File, Suffix}, []}) -> + case rabbit_misc:append_file(File, Suffix) of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to append contents of " ++ + "log file '~s' to '~s':~n~p~n", + [File, [File, Suffix], Error]) + end, + init(File); +%% Used only when swapping handlers and the original handler +%% failed to terminate or was never installed +init({{File, _}, error}) -> + init(File); +%% Used only when swapping handlers without performing +%% log rotation +init({File, []}) -> + init(File); +init({_File, _Type} = FileInfo) -> + error_logger_file_h:init(FileInfo); +init(File) -> + error_logger_file_h:init(File). + +handle_event(Event, State) -> + error_logger_file_h:handle_event(Event, State). + +handle_info(Event, State) -> + error_logger_file_h:handle_info(Event, State). + +handle_call(Event, State) -> + error_logger_file_h:handle_call(Event, State). + +terminate(Reason, State) -> + error_logger_file_h:terminate(Reason, State). + +code_change(OldVsn, State, Extra) -> + error_logger_file_h:code_change(OldVsn, State, Extra). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 113b7878..bb132a50 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -28,7 +28,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/6, lookup/1, lookup_or_die/1, +-export([recover/0, declare/5, lookup/1, lookup_or_die/1, list_vhost_exchanges/1, list_exchange_bindings/1, simple_publish/6, simple_publish/3, route/2]). @@ -50,21 +50,21 @@ not_found() | {'error', 'unroutable' | 'not_delivered'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/6 :: (realm_name(), name(), exchange_type(), bool(), bool(), +-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). --spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). +-spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list_vhost_exchanges/1 :: (vhost()) -> [exchange()]). --spec(list_exchange_bindings/1 :: (exchange_name()) -> +-spec(list_exchange_bindings/1 :: (exchange_name()) -> [{queue_name(), routing_key(), amqp_table()}]). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). -spec(route/2 :: (exchange(), routing_key()) -> [pid()]). --spec(add_binding/2 :: (binding_spec(), amqqueue()) -> +-spec(add_binding/2 :: (binding_spec(), amqqueue()) -> 'ok' | not_found() | {'error', 'durability_settings_incompatible'}). -spec(delete_binding/2 :: (binding_spec(), amqqueue()) -> @@ -90,23 +90,21 @@ recover_durable_exchanges() -> end, ok, durable_exchanges) end). -declare(RealmName, NameBin, Type, Durable, AutoDelete, Args) -> - XName = rabbit_misc:r(RealmName, exchange, NameBin), - Exchange = #exchange{name = XName, +declare(ExchangeName, Type, Durable, AutoDelete, Args) -> + Exchange = #exchange{name = ExchangeName, type = Type, durable = Durable, auto_delete = AutoDelete, arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, XName}) of + case mnesia:wread({exchange, ExchangeName}) of [] -> ok = mnesia:write(Exchange), if Durable -> ok = mnesia:write( durable_exchanges, Exchange, write); true -> ok end, - ok = rabbit_realm:add(RealmName, XName), Exchange; [ExistingX] -> ExistingX end @@ -147,15 +145,14 @@ list_vhost_exchanges(VHostPath) -> list_exchange_bindings(Name) -> [{QueueName, RoutingKey, Arguments} || - #binding{handlers = Handlers} <- bindings_for_exchange(Name), - #handler{binding_spec = #binding_spec{routing_key = RoutingKey, - arguments = Arguments}, - queue = QueueName} <- Handlers]. + #binding{handlers = Handlers} <- bindings_for_exchange(Name), + #handler{binding_spec = #binding_spec{routing_key = RoutingKey, + arguments = Arguments}, + queue = QueueName} <- Handlers]. bindings_for_exchange(Name) -> - qlc:e(qlc:q([B || - B = #binding{key = K} <- mnesia:table(binding), - element(1, K) == Name])). + qlc:e(qlc:q([B || B = #binding{key = K} <- mnesia:table(binding), + element(1, K) == Name])). empty_handlers() -> []. @@ -187,7 +184,7 @@ simple_publish(Mandatory, Immediate, %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. -%% +%% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. @@ -197,7 +194,7 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> mnesia:activity( async_dirty, fun () -> - qlc:e(qlc:q([handler_qpids(H) || + qlc:e(qlc:q([handler_qpids(H) || #binding{key = {Name1, PatternKey}, handlers = H} <- mnesia:table(binding), @@ -375,6 +372,5 @@ do_internal_delete(ExchangeName, Bindings) -> ok = mnesia:delete({binding, K}) end, Bindings), ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}), - ok = rabbit_realm:delete_from_all(ExchangeName) + ok = mnesia:delete({exchange, ExchangeName}) end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b71aba42..89648f4f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -26,23 +26,23 @@ -module(rabbit_misc). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). +-include_lib("kernel/include/file.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). --export([strict_ticket_checking/0]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). -export([r/3, r/2, rs/1]). --export([permission_list/1]). -export([enable_cover/0, report_cover/0]). -export([throw_on_error/2, with_exit_handler/2]). --export([with_user/2, with_vhost/2, with_realm/2, with_user_and_vhost/3]). +-export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([guid/0, string_guid/1, binstring_guid/1]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([append_file/2]). -import(mnesia). -import(lists). @@ -64,34 +64,30 @@ (atom() | amqp_error(), string(), [any()]) -> no_return()). -spec(protocol_error/4 :: (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). --spec(strict_ticket_checking/0 :: () -> bool()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (realm_name() | vhost(), K, name()) -> - r(K) when is_subtype(K, atom())). +-spec(r/3 :: (vhost(), K, resource_name()) -> r(K) when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} when is_subtype(K, atom())). --spec(rs/1 :: (r(atom())) -> string()). --spec(permission_list/1 :: (ticket()) -> [permission()]). +-spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). --spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). --spec(with_user/2 :: (username(), thunk(A)) -> A). +-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). +-spec(with_user/2 :: (username(), thunk(A)) -> A). -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). --spec(with_realm/2 :: (realm_name(), thunk(A)) -> A). --spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). +-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> node()). --spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). +-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). --spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(guid/0 :: () -> guid()). -spec(string_guid/1 :: (any()) -> string()). @@ -100,6 +96,7 @@ -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -endif. @@ -130,24 +127,6 @@ protocol_error(Error, Explanation, Params, Method) -> CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), exit({amqp, Error, CompleteExplanation, Method}). -boolean_config_param(Name, TrueValue, FalseValue, DefaultValue) -> - ActualValue = get_config(Name, DefaultValue), - if - ActualValue == TrueValue -> - true; - ActualValue == FalseValue -> - false; - true -> - rabbit_log:error( - "Bad setting for config param '~w': ~p~n" ++ - "legal values are '~w', '~w'; using default value '~w'", - [Name, ActualValue, TrueValue, FalseValue, DefaultValue]), - DefaultValue == TrueValue - end. - -strict_ticket_checking() -> - boolean_config_param(strict_ticket_checking, enabled, disabled, disabled). - get_config(Key) -> case dirty_read({rabbit_config, Key}) of {ok, {rabbit_config, Key, V}} -> {ok, V}; @@ -182,19 +161,6 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). -permission_list(Ticket = #ticket{}) -> - lists:foldr(fun ({Field, Label}, L) -> - case element(Field, Ticket) of - true -> [Label | L]; - false -> L - end - end, - [], - [{#ticket.passive_flag, passive}, - {#ticket.active_flag, active}, - {#ticket.write_flag, write}, - {#ticket.read_flag, read}]). - enable_cover() -> case cover:compile_beam_directory("ebin") of {error,Reason} -> {error,Reason}; @@ -260,32 +226,13 @@ with_user(Username, Thunk) -> with_vhost(VHostPath, Thunk) -> fun () -> case mnesia:read({vhost, VHostPath}) of - [] -> + [] -> mnesia:abort({no_such_vhost, VHostPath}); [_V] -> Thunk() end end. -with_realm(Name = #resource{virtual_host = VHostPath, kind = realm}, - Thunk) -> - fun () -> - case mnesia:read({realm, Name}) of - [] -> - mnesia:abort({no_such_realm, Name}); - [_R] -> - case mnesia:match_object( - #vhost_realm{virtual_host = VHostPath, - realm = Name}) of - [] -> - %% This should never happen - mnesia:abort({no_such_realm, Name}); - [_VR] -> - Thunk() - end - end - end. - with_user_and_vhost(Username, VHostPath, Thunk) -> with_user(Username, with_vhost(VHostPath, Thunk)). @@ -398,3 +345,24 @@ dirty_dump_log1(LH, {K, Terms}) -> dirty_dump_log1(LH, {K, Terms, BadBytes}) -> io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), dirty_dump_log1(LH, disk_log:chunk(LH, K)). + + +append_file(File, Suffix) -> + case file:read_file_info(File) of + {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); + {error, enoent} -> append_file(File, 0, Suffix); + Error -> Error + end. + +append_file(_, _, "") -> + ok; +append_file(File, 0, Suffix) -> + case file:open([File, Suffix], [append]) of + {ok, Fd} -> file:close(Fd); + Error -> Error + end; +append_file(File, _, Suffix) -> + case file:read_file(File) of + {ok, Data} -> file:write_file([File, Suffix], Data, [append]); + Error -> Error + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 82b80cb4..4ae367ba 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -102,23 +102,6 @@ table_definitions() -> {index, [virtual_host]}]}, {vhost, [{disc_copies, [node()]}, {attributes, record_info(fields, vhost)}]}, - {vhost_realm, [{type, bag}, - {disc_copies, [node()]}, - {attributes, record_info(fields, vhost_realm)}, - {index, [realm]}]}, - {realm, [{disc_copies, [node()]}, - {attributes, record_info(fields, realm)}]}, - {user_realm, [{type, bag}, - {disc_copies, [node()]}, - {attributes, record_info(fields, user_realm)}, - {index, [realm]}]}, - {exclusive_realm_visitor, - [{record_name, realm_visitor}, - {attributes, record_info(fields, realm_visitor)}, - {index, [pid]}]}, - {realm_visitor, [{type, bag}, - {attributes, record_info(fields, realm_visitor)}, - {index, [pid]}]}, {rabbit_config, [{disc_copies, [node()]}]}, {listener, [{type, bag}, {attributes, record_info(fields, listener)}]}, @@ -257,7 +240,6 @@ init_db(ClusterNodes) -> ClusterNodes}}) end; {ok, [_|_]} -> - ok = ensure_schema_integrity(), ok = wait_for_tables(), ok = create_local_table_copies( case IsDiskNode of @@ -341,6 +323,7 @@ create_local_table_copy(Tab, Type) -> ok. wait_for_tables() -> + ok = ensure_schema_integrity(), case mnesia:wait_for_tables(table_names(), 30000) of ok -> ok; {timeout, BadTabs} -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index cd92f1ac..c6a7e920 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -70,7 +70,9 @@ usage() -> Available commands: start_all <NodeCount> - start a local cluster of RabbitMQ nodes. + status - print status of all running nodes stop_all - stops all local RabbitMQ nodes. + rotate_logs [Suffix] - rotate logs for all local and running RabbitMQ nodes. "), halt(3). @@ -87,13 +89,46 @@ action(start_all, [NodeCount], RpcTimeout) -> false -> timeout end; +action(status, [], RpcTimeout) -> + io:format("Status of all running nodes...~n", []), + call_all_nodes( + fun({Node, Pid}) -> + Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + io:format("Node '~p' with Pid ~p: ~p~n", + [Node, Pid, case parse_status(Status) of + false -> not_running; + true -> running + end]) + end); + action(stop_all, [], RpcTimeout) -> io:format("Stopping all nodes...~n", []), - case read_pids_file() of - [] -> throw(no_nodes_running); - NodePids -> stop_nodes(NodePids, RpcTimeout), - delete_pids_file() - end. + call_all_nodes(fun({Node, Pid}) -> + io:format("Stopping node ~p~n", [Node]), + rpc:call(Node, rabbit, stop_and_halt, []), + case kill_wait(Pid, RpcTimeout, false) of + false -> kill_wait(Pid, RpcTimeout, true); + true -> ok + end, + io:format("OK~n", []) + end), + delete_pids_file(); + +action(rotate_logs, [], RpcTimeout) -> + action(rotate_logs, [""], RpcTimeout); + +action(rotate_logs, [Suffix], RpcTimeout) -> + io:format("Rotating logs for all nodes...~n", []), + BinarySuffix = list_to_binary(Suffix), + call_all_nodes( + fun ({Node, _}) -> + io:format("Rotating logs for node ~p", [Node]), + case rpc:call(Node, rabbit, rotate_logs, + [BinarySuffix], RpcTimeout) of + {badrpc, Error} -> io:format(": ~p.~n", [Error]); + ok -> io:format(": ok.~n", []) + end + end). %% PNodePid is the list of PIDs %% Running is a boolean exhibiting success at some moment @@ -222,21 +257,6 @@ read_pids_file() -> FileName, Reason}}) end. -stop_nodes([],_) -> ok; - -stop_nodes([NodePid | Rest], RpcTimeout) -> - stop_node(NodePid, RpcTimeout), - stop_nodes(Rest, RpcTimeout). - -stop_node({Node, Pid}, RpcTimeout) -> - io:format("Stopping node ~p~n", [Node]), - rpc:call(Node, rabbit, stop_and_halt, []), - case kill_wait(Pid, RpcTimeout, false) of - false -> kill_wait(Pid, RpcTimeout, true); - true -> ok - end, - io:format("OK~n", []). - kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 -> Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9"; true -> "kill" @@ -272,6 +292,12 @@ is_dead(Pid) -> end end}]). +call_all_nodes(Func) -> + case read_pids_file() of + [] -> throw(no_nodes_running); + NodePids -> lists:foreach(Func, NodePids) + end. + getenv(Var) -> case os:getenv(Var) of false -> throw({missing_env_var, Var}); diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index beef5285..2fb582a9 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -60,7 +60,6 @@ handle_info({nodedown, Node}, State) -> %% lots of nodes. We really only need to execute this code on %% *one* node, rather than all of them. ok = rabbit_networking:on_node_down(Node), - ok = rabbit_realm:on_node_down(Node), ok = rabbit_amqqueue:on_node_down(Node), {noreply, State}; handle_info(_Info, State) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 38349a1c..ce26c11a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -59,6 +59,7 @@ %% all states, unless specified otherwise: %% socket error -> *exit* %% socket close -> *throw* +%% writer send failure -> *throw* %% forced termination -> *exit* %% handshake_timeout -> *throw* %% pre-init: @@ -93,10 +94,18 @@ %% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* -%% channel exit -> -%% if abnormal exit then log error -%% if last channel to exit then send connection.close_ok, start -%% terminate_connection timer, *closing* +%% channel exit with hard error +%% -> log error, wait for channels to terminate forcefully, start +%% terminate_connection timer, send close, *closed* +%% channel exit with soft error +%% -> log error, start terminate_channel timer, mark channel as +%% closing +%% if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* +%% else *closing* +%% channel exits normally +%% -> if last channel to exit then send connection.close_ok, +%% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* %% receive connection.close_ok -> self() ! terminate_connection, @@ -243,6 +252,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); + {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + throw(E); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); {terminate_channel, Channel, Ref1} -> @@ -302,24 +313,13 @@ terminate_channel(Channel, Ref, State) -> end, State. -handle_dependent_exit(Pid, Reason, - State = #v1{connection_state = closing}) -> - case channel_cleanup(Pid) of - undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> - case Reason of - normal -> ok; - _ -> log_channel_error(closing, Channel, Reason) - end, - maybe_close(State) - end; handle_dependent_exit(Pid, normal, State) -> channel_cleanup(Pid), - State; + maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of undefined -> exit({abnormal_dependent_exit, Pid, Reason}); - Channel -> handle_exception(State, Channel, Reason) + Channel -> maybe_close(handle_exception(State, Channel, Reason)) end. channel_cleanup(Pid) -> @@ -376,13 +376,15 @@ wait_for_channel_termination(N, TimerRef) -> exit(channel_termination_timeout) end. -maybe_close(State) -> +maybe_close(State = #v1{connection_state = closing}) -> case all_channels() of [] -> ok = send_on_channel0( State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State - end. + end; +maybe_close(State) -> + State. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> diff --git a/src/rabbit_realm.erl b/src/rabbit_realm.erl deleted file mode 100644 index 4463954d..00000000 --- a/src/rabbit_realm.erl +++ /dev/null @@ -1,316 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd., -%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd., Cohesive Financial Technologies -%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 -%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit -%% Technologies Ltd.; -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_realm). - --export([recover/0]). --export([add_realm/1, delete_realm/1, list_vhost_realms/1]). --export([add/2, delete/2, check/2, delete_from_all/1]). --export([access_request/3, enter_realm/3, leave_realms/1]). --export([on_node_down/1]). - --include("rabbit.hrl"). --include_lib("stdlib/include/qlc.hrl"). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(e_or_q() :: 'exchange' | 'queue'). - --spec(recover/0 :: () -> 'ok'). --spec(add_realm/1 :: (realm_name()) -> 'ok'). --spec(delete_realm/1 :: (realm_name()) -> 'ok'). --spec(list_vhost_realms/1 :: (vhost()) -> [name()]). --spec(add/2 :: (realm_name(), r(e_or_q())) -> 'ok'). --spec(delete/2 :: (realm_name(), r(e_or_q())) -> 'ok'). --spec(check/2 :: (realm_name(), r(e_or_q())) -> bool() | not_found()). --spec(delete_from_all/1 :: (r(e_or_q())) -> 'ok'). --spec(access_request/3 :: (username(), bool(), ticket()) -> - 'ok' | not_found() | {'error', 'bad_realm_path' | - 'access_refused' | - 'resource_locked'}). --spec(enter_realm/3 :: (realm_name(), bool(), pid()) -> - 'ok' | {'error', 'resource_locked'}). --spec(leave_realms/1 :: (pid()) -> 'ok'). --spec(on_node_down/1 :: (node()) -> 'ok'). - --endif. - -%%-------------------------------------------------------------------- - -recover() -> - %% preens resource lists, limiting them to currently-extant resources - rabbit_misc:execute_mnesia_transaction( - fun () -> - Realms = mnesia:foldl(fun preen_realm/2, [], realm), - lists:foreach(fun mnesia:write/1, Realms), - ok - end). - -add_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> - case mnesia:read({realm, Name}) of - [] -> - NewRealm = #realm{name = Name, - exchanges = ordsets:new(), - queues = ordsets:new()}, - ok = mnesia:write(NewRealm), - ok = mnesia:write( - #vhost_realm{virtual_host = VHostPath, - realm = Name}), - ok; - [_R] -> - mnesia:abort({realm_already_exists, Name}) - end - end)). - -delete_realm(Name = #resource{virtual_host = VHostPath, kind = realm}) -> - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - rabbit_misc:with_realm( - Name, - fun () -> - ok = mnesia:delete({realm, Name}), - ok = mnesia:delete_object( - #vhost_realm{virtual_host = VHostPath, - realm = Name}), - lists:foreach(fun mnesia:delete_object/1, - mnesia:index_read(user_realm, Name, - #user_realm.realm)), - ok - end))). - -list_vhost_realms(VHostPath) -> - [Name || - #vhost_realm{realm = #resource{name = Name}} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> mnesia:read({vhost_realm, VHostPath}) end))]. - -add(Name = #resource{kind = realm}, Resource) -> - internal_update_realm_byname(Name, Resource, fun ordsets:add_element/2). - -delete(Name = #resource{kind = realm}, Resource) -> - internal_update_realm_byname(Name, Resource, fun ordsets:del_element/2). - -check(Name = #resource{kind = realm}, Resource = #resource{kind = Kind}) -> - case rabbit_misc:dirty_read({realm, Name}) of - {ok, R} -> - case Kind of - exchange -> ordsets:is_element(Resource, R#realm.exchanges); - queue -> ordsets:is_element(Resource, R#realm.queues) - end; - Other -> Other - end. - -% Requires a mnesia transaction. -delete_from_all(Resource = #resource{kind = Kind}) -> - Realms = mnesia:foldl - (fun (Realm = #realm{exchanges = E0, - queues = Q0}, - Acc) -> - IsMember = lists:member(Resource, - case Kind of - exchange -> E0; - queue -> Q0 - end), - if - IsMember -> - [internal_update_realm_record( - Realm, Resource, - fun ordsets:del_element/2) - | Acc]; - true -> - Acc - end - end, [], realm), - lists:foreach(fun mnesia:write/1, Realms), - ok. - -access_request(Username, Exclusive, Ticket = #ticket{realm_name = RealmName}) - when is_binary(Username) -> - %% FIXME: We should do this all in a single tx. Otherwise we may - %% a) get weird answers, b) create inconsistencies in the db - %% (e.g. realm_visitor records referring to non-existing realms). - case check_and_lookup(RealmName) of - {error, Reason} -> - {error, Reason}; - {ok, _Realm} -> - {ok, U} = rabbit_access_control:lookup_user(Username), - case rabbit_access_control:lookup_realm_access(U, RealmName) of - none -> - {error, access_refused}; - TicketPattern -> - case match_ticket(TicketPattern, Ticket) of - no_match -> - {error, access_refused}; - match -> - enter_realm(RealmName, Exclusive, self()) - end - end - end. - -enter_realm(Name = #resource{kind = realm}, IsExclusive, Pid) -> - RealmVisitor = #realm_visitor{realm = Name, pid = Pid}, - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({exclusive_realm_visitor, Name}) of - [] when IsExclusive -> - ok = mnesia:delete_object(RealmVisitor), - %% TODO: find a more efficient way of checking - %% for "no machting results" that doesn't - %% involve retrieving all the records - case mnesia:read({realm_visitor, Name}) of - [] -> - mnesia:write( - exclusive_realm_visitor, RealmVisitor, write), - ok; - [_|_] -> - {error, resource_locked} - end; - [] -> - ok = mnesia:write(RealmVisitor), - ok; - [RealmVisitor] when IsExclusive -> ok; - [RealmVisitor] -> - ok = mnesia:delete({exclusive_realm_visitor, Name}), - ok = mnesia:write(RealmVisitor), - ok; - [_] -> - {error, resource_locked} - end - end). - -leave_realms(Pid) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:index_read(exclusive_realm_visitor, Pid, - #realm_visitor.pid) of - [] -> ok; - [R] -> - ok = mnesia:delete_object( - exclusive_realm_visitor, R, write) - end, - lists:foreach(fun mnesia:delete_object/1, - mnesia:index_read(realm_visitor, Pid, - #realm_visitor.pid)), - ok - end). - -on_node_down(Node) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach( - fun (T) -> ok = remove_visitors(Node, T) end, - [exclusive_realm_visitor, realm_visitor]), - ok - end). - -%%-------------------------------------------------------------------- - -preen_realm(Realm = #realm{name = #resource{kind = realm}, - exchanges = E0, - queues = Q0}, - Realms) -> - [Realm#realm{exchanges = filter_out_missing(E0, exchange), - queues = filter_out_missing(Q0, amqqueue)} - | Realms]. - -filter_out_missing(Items, TableName) -> - ordsets:filter(fun (Item) -> - case mnesia:read({TableName, Item}) of - [] -> false; - _ -> true - end - end, Items). - -internal_update_realm_byname(Name, Resource, SetUpdater) -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({realm, Name}) of - [] -> - mnesia:abort(not_found); - [R] -> - ok = mnesia:write(internal_update_realm_record - (R, Resource, SetUpdater)) - end - end). - -internal_update_realm_record(R = #realm{exchanges = E0, queues = Q0}, - Resource = #resource{kind = Kind}, - SetUpdater) -> - case Kind of - exchange -> R#realm{exchanges = SetUpdater(Resource, E0)}; - queue -> R#realm{queues = SetUpdater(Resource, Q0)} - end. - -check_and_lookup(RealmName = #resource{kind = realm, - name = <<"/data", _/binary>>}) -> - lookup(RealmName); -check_and_lookup(RealmName = #resource{kind = realm, - name = <<"/admin", _/binary>>}) -> - lookup(RealmName); -check_and_lookup(_) -> - {error, bad_realm_path}. - -lookup(Name = #resource{kind = realm}) -> - rabbit_misc:dirty_read({realm, Name}). - -match_ticket(#ticket{passive_flag = PP, - active_flag = PA, - write_flag = PW, - read_flag = PR}, - #ticket{passive_flag = TP, - active_flag = TA, - write_flag = TW, - read_flag = TR}) -> - if - %% Matches if either we're not requesting passive access, or - %% passive access is permitted, and ... - (not(TP) orelse PP) andalso - (not(TA) orelse PA) andalso - (not(TW) orelse PW) andalso - (not(TR) orelse PR) -> - match; - true -> - no_match - end. - -remove_visitors(Node, T) -> - qlc:fold( - fun (R, Acc) -> - ok = mnesia:delete_object(T, R, write), - Acc - end, - ok, - qlc:q([R || R = #realm_visitor{pid = Pid} <- mnesia:table(T), - node(Pid) == Node])). diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 41a8d64c..a2337647 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -150,11 +150,9 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> fun (QPid, {Routed, Handled}) -> case catch rabbit_amqqueue:deliver(IsMandatory, IsImmediate, Txn, Message, QPid) of - true -> {true, [QPid | Handled]}; - false -> {true, Handled}; - {'EXIT', Reason} -> rabbit_log:warning("delivery to ~p failed:~n~p~n", - [QPid, Reason]), - {Routed, Handled} + true -> {true, [QPid | Handled]}; + false -> {true, Handled}; + {'EXIT', _Reason} -> {Routed, Handled} end end, {false, []}, diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl new file mode 100644 index 00000000..3374d63d --- /dev/null +++ b/src/rabbit_sasl_report_file_h.erl @@ -0,0 +1,86 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial Technologies +%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 +%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_sasl_report_file_h). + +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). + +%% rabbit_sasl_report_file_h is a wrapper around the sasl_report_file_h +%% module because the original's init/1 does not match properly +%% with the result of closing the old handler when swapping handlers. +%% The first init/1 additionally allows for simple log rotation +%% when the suffix is not the empty string. + +%% Used only when swapping handlers and performing +%% log rotation +init({{File, Suffix}, []}) -> + case rabbit_misc:append_file(File, Suffix) of + ok -> ok; + {error, Error} -> + rabbit_log:error("Failed to append contents of " ++ + "sasl log file '~s' to '~s':~n~p~n", + [File, [File, Suffix], Error]) + end, + init(File); +%% Used only when swapping handlers and the original handler +%% failed to terminate or was never installed +init({{File, _}, error}) -> + init(File); +%% Used only when swapping handlers without +%% doing any log rotation +init({File, []}) -> + init(File); +init({_File, _Type} = FileInfo) -> + sasl_report_file_h:init(FileInfo); +init(File) -> + sasl_report_file_h:init({File, sasl_error_logger_type()}). + +handle_event(Event, State) -> + sasl_report_file_h:handle_event(Event, State). + +handle_info(Event, State) -> + sasl_report_file_h:handle_info(Event, State). + +handle_call(Event, State) -> + sasl_report_file_h:handle_call(Event, State). + +terminate(Reason, State) -> + sasl_report_file_h:terminate(Reason, State). + +code_change(OldVsn, State, Extra) -> + sasl_report_file_h:code_change(OldVsn, State, Extra). + +%%---------------------------------------------------------------------- + +sasl_error_logger_type() -> + case application:get_env(sasl, errlog_type) of + {ok, error} -> error; + {ok, progress} -> progress; + {ok, all} -> all; + {ok, Bad} -> throw({error, {wrong_errlog_type, Bad}}); + _ -> all + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index beeb3508..fff02d73 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -29,6 +29,8 @@ -import(lists). +-include_lib("kernel/include/file.hrl"). + test_content_prop_roundtrip(Datum, Binary) -> Types = [element(1, E) || E <- Datum], Values = [element(2, E) || E <- Datum], @@ -38,7 +40,9 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_parsing(), passed = test_topic_matching(), + passed = test_log_management(), passed = test_app_management(), + passed = test_log_management_during_startup(), passed = test_cluster_management(), passed = test_user_management(), passed. @@ -136,6 +140,134 @@ test_app_management() -> ok = control_action(status, []), passed. +test_log_management() -> + MainLog = rabbit:log_location(kernel), + SaslLog = rabbit:log_location(sasl), + Suffix = ".1", + + %% prepare basic logs + file:delete([MainLog, Suffix]), + file:delete([SaslLog, Suffix]), + + %% simple logs reopening + ok = control_action(rotate_logs, []), + [true, true] = empty_files([MainLog, SaslLog]), + ok = test_logs_working(MainLog, SaslLog), + + %% simple log rotation + ok = control_action(rotate_logs, [Suffix]), + [true, true] = non_empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), + [true, true] = empty_files([MainLog, SaslLog]), + ok = test_logs_working(MainLog, SaslLog), + + %% reopening logs with log rotation performed first + ok = clean_logs([MainLog, SaslLog], Suffix), + ok = control_action(rotate_logs, []), + ok = file:rename(MainLog, [MainLog, Suffix]), + ok = file:rename(SaslLog, [SaslLog, Suffix]), + ok = test_logs_working([MainLog, Suffix], [SaslLog, Suffix]), + ok = control_action(rotate_logs, []), + ok = test_logs_working(MainLog, SaslLog), + + %% log rotation on empty file + ok = clean_logs([MainLog, SaslLog], Suffix), + ok = control_action(rotate_logs, []), + ok = control_action(rotate_logs, [Suffix]), + [true, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]), + + %% original main log file is not writable + ok = make_files_non_writable([MainLog]), + {error, {cannot_rotate_main_logs, _}} = control_action(rotate_logs, []), + ok = clean_logs([MainLog], Suffix), + ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}]), + + %% original sasl log file is not writable + ok = make_files_non_writable([SaslLog]), + {error, {cannot_rotate_sasl_logs, _}} = control_action(rotate_logs, []), + ok = clean_logs([SaslLog], Suffix), + ok = add_log_handlers([{rabbit_sasl_report_file_h, SaslLog}]), + + %% logs with suffix are not writable + ok = control_action(rotate_logs, [Suffix]), + ok = make_files_non_writable([[MainLog, Suffix], [SaslLog, Suffix]]), + ok = control_action(rotate_logs, [Suffix]), + ok = test_logs_working(MainLog, SaslLog), + + %% original log files are not writable + ok = make_files_non_writable([MainLog, SaslLog]), + {error, {{cannot_rotate_main_logs, _}, + {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []), + + %% logging directed to tty (handlers were removed in last test) + ok = clean_logs([MainLog, SaslLog], Suffix), + ok = application:set_env(sasl, sasl_error_logger, tty), + ok = application:set_env(kernel, error_logger, tty), + ok = control_action(rotate_logs, []), + [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), + + %% rotate logs when logging is turned off + ok = application:set_env(sasl, sasl_error_logger, false), + ok = application:set_env(kernel, error_logger, silent), + ok = control_action(rotate_logs, []), + [{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]), + + %% cleanup + ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}), + ok = application:set_env(kernel, error_logger, {file, MainLog}), + ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}, + {rabbit_sasl_report_file_h, SaslLog}]), + passed. + +test_log_management_during_startup() -> + MainLog = rabbit:log_location(kernel), + SaslLog = rabbit:log_location(sasl), + + %% start application with simple tty logging + ok = control_action(stop_app, []), + ok = application:set_env(kernel, error_logger, tty), + ok = application:set_env(sasl, sasl_error_logger, tty), + ok = add_log_handlers([{error_logger_tty_h, []}, + {sasl_report_tty_h, []}]), + ok = control_action(start_app, []), + + %% start application with tty logging and + %% proper handlers not installed + ok = control_action(stop_app, []), + ok = error_logger:tty(false), + ok = delete_log_handlers([sasl_report_tty_h]), + ok = case catch control_action(start_app, []) of + ok -> exit(got_success_but_expected_failure); + {error, {cannot_log_to_tty, _, _}} -> ok + end, + + %% fix sasl logging + ok = application:set_env(sasl, sasl_error_logger, + {file, SaslLog}), + + %% start application with logging to invalid directory + TmpLog = "/tmp/rabbit-tests/test.log", + file:delete(TmpLog), + ok = application:set_env(kernel, error_logger, {file, TmpLog}), + + ok = delete_log_handlers([rabbit_error_logger_file_h]), + ok = add_log_handlers([{error_logger_file_h, MainLog}]), + ok = case catch control_action(start_app, []) of + ok -> exit(got_success_but_expected_failure); + {error, {cannot_log_to_file, _, _}} -> ok + end, + + %% start application with standard error_logger_file_h + %% handler not installed + ok = application:set_env(kernel, error_logger, {file, MainLog}), + ok = control_action(start_app, []), + ok = control_action(stop_app, []), + + %% start application with standard sasl handler not installed + %% and rabbit main log handler installed correctly + ok = delete_log_handlers([rabbit_sasl_report_file_h]), + ok = control_action(start_app, []), + passed. + test_cluster_management() -> %% 'cluster' and 'reset' should only work if the app is stopped @@ -203,7 +335,6 @@ test_cluster_management() -> end, ok = control_action(start_app, []), - passed. test_cluster_management2(SecondaryNode) -> @@ -284,31 +415,12 @@ test_user_management() -> control_action(unmap_user_vhost, ["foo", "/"]), {error, {no_such_user, _}} = control_action(list_user_vhosts, ["foo"]), - {error, {no_such_user, _}} = - control_action(set_permissions, ["foo", "/", "/data"]), - {error, {no_such_user, _}} = - control_action(list_permissions, ["foo", "/"]), {error, {no_such_vhost, _}} = control_action(map_user_vhost, ["guest", "/testhost"]), {error, {no_such_vhost, _}} = control_action(unmap_user_vhost, ["guest", "/testhost"]), {error, {no_such_vhost, _}} = control_action(list_vhost_users, ["/testhost"]), - {error, {no_such_vhost, _}} = - control_action(set_permissions, ["guest", "/testhost", "/data"]), - {error, {no_such_vhost, _}} = - control_action(list_permissions, ["guest", "/testhost"]), - {error, {no_such_vhost, _}} = - control_action(add_realm, ["/testhost", "/data/test"]), - {error, {no_such_vhost, _}} = - control_action(delete_realm, ["/testhost", "/data/test"]), - {error, {no_such_vhost, _}} = - control_action(list_realms, ["/testhost"]), - {error, {no_such_realm, _}} = - control_action(set_permissions, ["guest", "/", "/data/test"]), - {error, {no_such_realm, _}} = - control_action(delete_realm, ["/", "/data/test"]), - %% user creation ok = control_action(add_user, ["foo", "bar"]), {error, {user_already_exists, _}} = @@ -327,32 +439,6 @@ test_user_management() -> ok = control_action(map_user_vhost, ["foo", "/testhost"]), ok = control_action(list_user_vhosts, ["foo"]), - %% realm creation - ok = control_action(add_realm, ["/testhost", "/data/test"]), - {error, {realm_already_exists, _}} = - control_action(add_realm, ["/testhost", "/data/test"]), - ok = control_action(list_realms, ["/testhost"]), - - %% user permissions - ok = control_action(set_permissions, - ["foo", "/testhost", "/data/test", - "passive", "active", "write", "read"]), - ok = control_action(list_permissions, ["foo", "/testhost"]), - ok = control_action(set_permissions, - ["foo", "/testhost", "/data/test", "all"]), - ok = control_action(set_permissions, - ["foo", "/testhost", "/data/test"]), - {error, not_mapped_to_vhost} = - control_action(set_permissions, - ["guest", "/testhost", "/data/test"]), - {error, not_mapped_to_vhost} = - control_action(list_permissions, ["guest", "/testhost"]), - - %% realm deletion - ok = control_action(delete_realm, ["/testhost", "/data/test"]), - {error, {no_such_realm, _}} = - control_action(delete_realm, ["/testhost", "/data/test"]), - %% user/vhost unmapping ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), ok = control_action(unmap_user_vhost, ["foo", "/testhost"]), @@ -364,13 +450,7 @@ test_user_management() -> %% deleting a populated vhost ok = control_action(add_vhost, ["/testhost"]), - ok = control_action(add_realm, ["/testhost", "/data/test"]), ok = control_action(map_user_vhost, ["foo", "/testhost"]), - ok = control_action(set_permissions, - ["foo", "/testhost", "/data/test", "all"]), - _ = rabbit_amqqueue:declare( - rabbit_misc:r(<<"/testhost">>, realm, <<"/data/test">>), - <<"bar">>, true, false, []), ok = control_action(delete_vhost, ["/testhost"]), %% user deletion @@ -380,6 +460,8 @@ test_user_management() -> passed. +%--------------------------------------------------------------------- + control_action(Command, Args) -> control_action(Command, node(), Args). control_action(Command, Node, Args) -> @@ -391,3 +473,52 @@ control_action(Command, Node, Args) -> io:format("failed.~n"), Other end. + +empty_files(Files) -> + [case file:read_file_info(File) of + {ok, FInfo} -> FInfo#file_info.size == 0; + Error -> Error + end || File <- Files]. + +non_empty_files(Files) -> + [case EmptyFile of + {error, Reason} -> {error, Reason}; + _ -> not(EmptyFile) + end || EmptyFile <- empty_files(Files)]. + +test_logs_working(MainLogFile, SaslLogFile) -> + ok = rabbit_log:error("foo bar"), + ok = error_logger:error_report(crash_report, [foo, bar]), + %% give the error loggers some time to catch up + timer:sleep(50), + [true, true] = non_empty_files([MainLogFile, SaslLogFile]), + ok. + +clean_logs(Files, Suffix) -> + [begin + ok = delete_file(File), + ok = delete_file([File, Suffix]) + end || File <- Files], + ok. + +delete_file(File) -> + case file:delete(File) of + ok -> ok; + {error, enoent} -> ok; + Error -> Error + end. + +make_files_non_writable(Files) -> + [ok = file:write_file_info(File, #file_info{mode=0}) || + File <- Files], + ok. + +add_log_handlers(Handlers) -> + [ok = error_logger:add_report_handler(Handler, Args) || + {Handler, Args} <- Handlers], + ok. + +delete_log_handlers(Handlers) -> + [[] = error_logger:delete_report_handler(Handler) || + Handler <- Handlers], + ok. diff --git a/src/rabbit_ticket.erl b/src/rabbit_ticket.erl deleted file mode 100644 index 3a608faa..00000000 --- a/src/rabbit_ticket.erl +++ /dev/null @@ -1,131 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd., -%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd., Cohesive Financial Technologies -%% LLC., and Rabbit Technologies Ltd. are Copyright (C) 2007-2008 -%% LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit -%% Technologies Ltd.; -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_ticket). --include("rabbit.hrl"). - --export([record_ticket/2, lookup_ticket/4, check_ticket/4]). - --import(application). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(ticket_number() :: non_neg_integer()). -%% we'd like to write #ticket.passive_flag | #ticket.active_flag | ... -%% but dialyzer doesn't support that. --type(ticket_field() :: 3..6). - --spec(record_ticket/2 :: (ticket_number(), ticket()) -> 'ok'). --spec(lookup_ticket/4 :: - (ticket_number(), ticket_field(), username(), vhost()) -> - ticket()). --spec(check_ticket/4 :: - (ticket_number(), ticket_field(), r('exchange' | 'queue'), username()) -> - 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -record_ticket(TicketNumber, Ticket) -> - put({ticket, TicketNumber}, Ticket), - ok. - -lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath) -> - case get({ticket, TicketNumber}) of - undefined -> - %% Spec: "The server MUST isolate access tickets per - %% channel and treat an attempt by a client to mix these - %% as a connection exception." - rabbit_log:warning("Attempt by client to use invalid ticket ~p~n", [TicketNumber]), - maybe_relax_checks(TicketNumber, Username, VHostPath); - Ticket = #ticket{} -> - case element(FieldIndex, Ticket) of - false -> rabbit_misc:protocol_error( - access_refused, - "ticket ~w has insufficient permissions", - [TicketNumber]); - true -> Ticket - end - end. - -maybe_relax_checks(TicketNumber, Username, VHostPath) -> - case rabbit_misc:strict_ticket_checking() of - true -> - rabbit_misc:protocol_error( - access_refused, "invalid ticket ~w", [TicketNumber]); - false -> - rabbit_log:warning("Lax ticket check mode: fabricating full ticket ~p for user ~p, vhost ~p~n", - [TicketNumber, Username, VHostPath]), - Ticket = rabbit_access_control:full_ticket( - rabbit_misc:r(VHostPath, realm, <<"/data">>)), - case rabbit_realm:access_request(Username, false, Ticket) of - ok -> record_ticket(TicketNumber, Ticket), - Ticket; - {error, Reason} -> - rabbit_misc:protocol_error( - Reason, - "fabrication of ticket ~w for user '~s' in vhost '~s' failed", - [TicketNumber, Username, VHostPath]) - end - end. - -check_ticket(TicketNumber, FieldIndex, - Name = #resource{virtual_host = VHostPath}, Username) -> - #ticket{realm_name = RealmName} = - lookup_ticket(TicketNumber, FieldIndex, Username, VHostPath), - case resource_in_realm(RealmName, Name) of - false -> - case rabbit_misc:strict_ticket_checking() of - true -> - rabbit_misc:protocol_error( - access_refused, - "insufficient permissions in ticket ~w to access ~s in ~s", - [TicketNumber, rabbit_misc:rs(Name), - rabbit_misc:rs(RealmName)]); - false -> - rabbit_log:warning("Lax ticket check mode: ignoring cross-realm access for ticket ~p~n", [TicketNumber]), - ok - end; - true -> - ok - end. - -resource_in_realm(RealmName, ResourceName = #resource{kind = Kind}) -> - CacheKey = {resource_cache, RealmName, Kind}, - case get(CacheKey) of - Name when Name == ResourceName -> - true; - _ -> - case rabbit_realm:check(RealmName, ResourceName) of - true -> - put(CacheKey, ResourceName), - true; - _ -> - false - end - end. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index c3c7db53..2c7fa2ab 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -157,10 +157,15 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. internal_send_command_async(Sock, Channel, MethodRecord) -> - true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord)), + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), ok. internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> - true = erlang:port_command(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, + Content, FrameMax)), ok. + +port_cmd(Sock, Data) -> + try erlang:port_command(Sock, Data) + catch error:Error -> exit({writer, send_failed, Error}) + end. |
