diff options
-rw-r--r-- | codegen.py | 10 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 47 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 9 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 43 |
5 files changed, 67 insertions, 44 deletions
@@ -75,6 +75,8 @@ def erlangize(s): AmqpMethod.erlangName = lambda m: "'" + erlangize(m.klass.name) + '.' + erlangize(m.name) + "'" +AmqpClass.erlangName = lambda c: "'" + erlangize(c.name) + "'" + def erlangConstantName(s): return '_'.join(re.split('[- ]', s.upper())) @@ -167,6 +169,9 @@ def genErl(spec): def genLookupMethodName(m): print "lookup_method_name({%d, %d}) -> %s;" % (m.klass.index, m.index, m.erlangName()) + def genLookupClassName(c): + print "lookup_class_name(%d) -> %s;" % (c.index, c.erlangName()) + def genMethodId(m): print "method_id(%s) -> {%d, %d};" % (m.erlangName(), m.klass.index, m.index) @@ -325,6 +330,8 @@ def genErl(spec): -export([version/0]). -export([lookup_method_name/1]). +-export([lookup_class_name/1]). + -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). @@ -427,6 +434,9 @@ bitvalue(undefined) -> 0. for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." + for c in spec.allClasses(): genLookupClassName(c) + print "lookup_class_name(ClassId) -> exit({unknown_class_id, ClassId})." + for m in methods: genMethodId(m) print "method_id(Name) -> exit({unknown_method_name, Name})." diff --git a/include/rabbit.hrl b/include/rabbit.hrl index bce9dfa3..af6e257a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,7 +70,7 @@ -record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). --record(amqp_error, {name, explanation, method = none}). +-record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 056ab1b5..722573c7 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -47,6 +47,7 @@ -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). -export([ensure_content_encoded/2, clear_encoded_content/1]). +-export([map_exception/3]). -import(lists). @@ -74,6 +75,9 @@ rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). +-spec(map_exception/3 :: (non_neg_integer(), rabbit_types:amqp_error(), + rabbit_types:protocol()) -> + {boolean(), non_neg_integer(), rabbit_framing:amqp_method()}). -endif. @@ -306,3 +310,46 @@ clear_encoded_content(Content = #content{properties = none}) -> Content; clear_encoded_content(Content = #content{}) -> Content#content{properties_bin = none, protocol = none}. + +%% NB: this function is also used by the Erlang client +map_exception(Channel, Reason, Protocol) -> + {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = + lookup_amqp_exception(Reason, Protocol), + ShouldClose = SuggestedClose orelse (Channel == 0), + {ClassId, MethodId} = case FailedMethod of + {_, _} -> FailedMethod; + none -> {0, 0}; + _ -> Protocol:method_id(FailedMethod) + end, + {CloseChannel, CloseMethod} = + case ShouldClose of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end, + {ShouldClose, CloseChannel, CloseMethod}. + +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; +lookup_amqp_exception(Other, Protocol) -> + rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), + {ShouldClose, Code, Text} = + Protocol:lookup_amqp_exception(internal_error, Protocol), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 33d13978..577d206d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,6 +44,9 @@ -include("rabbit.hrl"). +-define(SCHEMA_VERSION_SET, []). +-define(SCHEMA_VERSION_FILENAME, "schema_version"). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -91,6 +94,9 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), + ok = rabbit_misc:write_term_file(filename:join( + dir(), ?SCHEMA_VERSION_FILENAME), + [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -243,7 +249,8 @@ ensure_mnesia_dir() -> case filelib:ensure_dir(MnesiaDir) of {error, Reason} -> throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> ok + ok -> + ok end. ensure_mnesia_running() -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ff0fb8f7..e500b111 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -906,7 +906,7 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> {ShouldClose, CloseChannel, CloseMethod} = - map_exception(Channel, Reason, Protocol), + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); @@ -916,47 +916,6 @@ send_exception(State = #v1{connection = #connection{protocol = Protocol}}, NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason, Protocol) -> - {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose or (Channel == 0), - {ClassId, MethodId} = case FailedMethod of - {_, _} -> FailedMethod; - none -> {0, 0}; - _ -> Protocol:method_id(FailedMethod) - end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. - -lookup_amqp_exception(#amqp_error{name = Name, - explanation = Expl, - method = Method}, - Protocol) -> - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), - ExplBin = amqp_exception_explanation(Text, Expl), - {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other, Protocol) -> - rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), - {ShouldClose, Code, Text, none}. - -amqp_exception_explanation(Text, Expl) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; - true -> CompleteTextBin - end. - internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), State#v1{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}. |