diff options
Diffstat (limited to 'src/rabbit_basic.erl')
-rw-r--r-- | src/rabbit_basic.erl | 71 |
1 files changed, 52 insertions, 19 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index c5bd9575..3cf73e80 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,10 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/5]). +-export([publish/1, message/3, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). --export([is_message_persistent/1]). %%---------------------------------------------------------------------------- @@ -41,8 +40,11 @@ rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> - (rabbit_types:message() | rabbit_types:error(any()))). + properties_input(), binary()) -> rabbit_types:message()). +-spec(message/3 :: + (rabbit_exchange:name(), rabbit_router:routing_key(), + rabbit_types:decoded_content()) -> + rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: @@ -56,9 +58,6 @@ rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). --spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) -> - (boolean() | - {'invalid', non_neg_integer()})). -endif. @@ -98,19 +97,40 @@ from_content(Content) -> rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. -message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> +%% This breaks the spec rule forbidding message modification +strip_header(#content{properties = #'P_basic'{headers = undefined}} + = DecodedContent, _Key) -> + DecodedContent; +strip_header(#content{properties = Props = #'P_basic'{headers = Headers}} + = DecodedContent, Key) -> + case lists:keysearch(Key, 1, Headers) of + false -> DecodedContent; + {value, Found} -> Headers0 = lists:delete(Found, Headers), + rabbit_binary_generator:clear_encoded_content( + DecodedContent#content{ + properties = Props#'P_basic'{ + headers = Headers0}}) + end. + +message(ExchangeName, RoutingKey, + #content{properties = Props} = DecodedContent) -> + try + {ok, #basic_message{ + exchange_name = ExchangeName, + content = strip_header(DecodedContent, ?DELETED_HEADER), + id = rabbit_guid:guid(), + is_persistent = is_message_persistent(DecodedContent), + routing_keys = [RoutingKey | + header_routes(Props#'P_basic'.headers)]}} + catch + {error, _Reason} = Error -> Error + end. + +message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> Properties = properties(RawProperties), Content = build_content(Properties, BodyBin), - case is_message_persistent(Content) of - {invalid, Other} -> - {error, {invalid_delivery_mode, Other}}; - IsPersistent when is_boolean(IsPersistent) -> - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKeyBin, - content = Content, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent} - end. + {ok, Msg} = message(ExchangeName, RoutingKey, Content), + Msg. properties(P = #'P_basic'{}) -> P; @@ -152,5 +172,18 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> {invalid, Other} + Other -> throw({error, {delivery_mode_unknown, Other}}) end. + +%% Extract CC routes from headers +header_routes(undefined) -> + []; +header_routes(HeadersTable) -> + lists:append( + [case rabbit_misc:table_lookup(HeadersTable, HeaderKey) of + {array, Routes} -> [Route || {longstr, Route} <- Routes]; + undefined -> []; + {Type, _Val} -> throw({error, {unacceptable_type_in_header, + Type, + binary_to_list(HeaderKey)}}) + end || HeaderKey <- ?ROUTING_HEADERS]). |