diff options
Diffstat (limited to 'src/rabbit_basic.erl')
-rw-r--r-- | src/rabbit_basic.erl | 104 |
1 files changed, 76 insertions, 28 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 734456d3..2e825536 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -10,17 +10,18 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% -module(rabbit_basic). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, publish/6, publish/1, - message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1]). +-export([publish/4, publish/5, publish/1, + message/3, message/4, properties/1, prepend_table_header/3, + extract_headers/1, map_headers/2, delivery/3, header_routes/1, + parse_expiration/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -40,13 +41,13 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), +-spec(publish/5 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: - (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/3 :: + (boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -58,7 +59,7 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). --spec(append_table_header/3 :: +-spec(prepend_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). @@ -72,6 +73,9 @@ binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). +-spec(parse_expiration/1 :: + (rabbit_framing:amqp_property_record()) + -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())). -endif. @@ -80,18 +84,16 @@ %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + publish(Exchange, RoutingKeyBin, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - publish(delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -105,8 +107,8 @@ publish(X, Delivery) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. -delivery(Mandatory, Immediate, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), +delivery(Mandatory, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> @@ -179,15 +181,45 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). -append_table_header(Name, Info, undefined) -> - append_table_header(Name, Info, []); -append_table_header(Name, Info, Headers) -> - Prior = case rabbit_misc:table_lookup(Headers, Name) of - undefined -> []; - {array, Existing} -> Existing - end, +prepend_table_header(Name, Info, undefined) -> + prepend_table_header(Name, Info, []); +prepend_table_header(Name, Info, Headers) -> + case rabbit_misc:table_lookup(Headers, Name) of + {array, Existing} -> + prepend_table(Name, Info, Existing, Headers); + undefined -> + prepend_table(Name, Info, [], Headers); + Other -> + Headers2 = prepend_table(Name, Info, [], Headers), + set_invalid_header(Name, Other, Headers2) + end. + +prepend_table(Name, Info, Prior, Headers) -> rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) -> + case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of + undefined -> + set_invalid([{Name, array, [Value]}], Headers); + {table, ExistingHdr} -> + update_invalid(Name, Value, ExistingHdr, Headers); + Other -> + %% somehow the x-invalid-headers header is corrupt + Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}], + set_invalid_header(Name, Value, set_invalid(Invalid, Headers)) + end. + +set_invalid(NewHdr, Headers) -> + rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr). + +update_invalid(Name, Value, ExistingHdr, Header) -> + Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of + undefined -> [Value]; + {array, Prior} -> [Value | Prior] + end, + NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values), + set_invalid(NewHdr, Header). + extract_headers(Content) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), @@ -226,3 +258,19 @@ header_routes(HeadersTable) -> {Type, _Val} -> throw({error, {unacceptable_type_in_header, binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). + +parse_expiration(#'P_basic'{expiration = undefined}) -> + {ok, undefined}; +parse_expiration(#'P_basic'{expiration = Expiration}) -> + case string:to_integer(binary_to_list(Expiration)) of + {error, no_integer} = E -> + E; + {N, ""} -> + case rabbit_misc:check_expiry(N) of + ok -> {ok, N}; + E = {error, _} -> E + end; + {_, S} -> + {error, {leftover_string, S}} + end. + |