summaryrefslogtreecommitdiff
path: root/src/rabbit_basic.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_basic.erl')
-rw-r--r--src/rabbit_basic.erl104
1 files changed, 76 insertions, 28 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 734456d3..2e825536 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -10,17 +10,18 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_basic).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/4, publish/6, publish/1,
- message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
+-export([publish/4, publish/5, publish/1,
+ message/3, message/4, properties/1, prepend_table_header/3,
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1,
+ parse_expiration/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -40,13 +41,13 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+-spec(publish/5 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/3 ::
+ (boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -58,7 +59,7 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(append_table_header/3 ::
+-spec(prepend_table_header/3 ::
(binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
@@ -72,6 +73,9 @@
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
+-spec(parse_expiration/1 ::
+ (rabbit_framing:amqp_property_record())
+ -> rabbit_types:ok_or_error2('undefined' | non_neg_integer(), any())).
-endif.
@@ -80,18 +84,16 @@
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+ publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- publish(delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined)).
+publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(X, delivery(Mandatory, Message, undefined));
+publish(XName, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(delivery(Mandatory, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -105,8 +107,8 @@ publish(X, Delivery) ->
{RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
-delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+delivery(Mandatory, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
@@ -179,15 +181,45 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
-append_table_header(Name, Info, undefined) ->
- append_table_header(Name, Info, []);
-append_table_header(Name, Info, Headers) ->
- Prior = case rabbit_misc:table_lookup(Headers, Name) of
- undefined -> [];
- {array, Existing} -> Existing
- end,
+prepend_table_header(Name, Info, undefined) ->
+ prepend_table_header(Name, Info, []);
+prepend_table_header(Name, Info, Headers) ->
+ case rabbit_misc:table_lookup(Headers, Name) of
+ {array, Existing} ->
+ prepend_table(Name, Info, Existing, Headers);
+ undefined ->
+ prepend_table(Name, Info, [], Headers);
+ Other ->
+ Headers2 = prepend_table(Name, Info, [], Headers),
+ set_invalid_header(Name, Other, Headers2)
+ end.
+
+prepend_table(Name, Info, Prior, Headers) ->
rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+set_invalid_header(Name, {_, _}=Value, Headers) when is_list(Headers) ->
+ case rabbit_misc:table_lookup(Headers, ?INVALID_HEADERS_KEY) of
+ undefined ->
+ set_invalid([{Name, array, [Value]}], Headers);
+ {table, ExistingHdr} ->
+ update_invalid(Name, Value, ExistingHdr, Headers);
+ Other ->
+ %% somehow the x-invalid-headers header is corrupt
+ Invalid = [{?INVALID_HEADERS_KEY, array, [Other]}],
+ set_invalid_header(Name, Value, set_invalid(Invalid, Headers))
+ end.
+
+set_invalid(NewHdr, Headers) ->
+ rabbit_misc:set_table_value(Headers, ?INVALID_HEADERS_KEY, table, NewHdr).
+
+update_invalid(Name, Value, ExistingHdr, Header) ->
+ Values = case rabbit_misc:table_lookup(ExistingHdr, Name) of
+ undefined -> [Value];
+ {array, Prior} -> [Value | Prior]
+ end,
+ NewHdr = rabbit_misc:set_table_value(ExistingHdr, Name, array, Values),
+ set_invalid(NewHdr, Header).
+
extract_headers(Content) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
@@ -226,3 +258,19 @@ header_routes(HeadersTable) ->
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
+
+parse_expiration(#'P_basic'{expiration = undefined}) ->
+ {ok, undefined};
+parse_expiration(#'P_basic'{expiration = Expiration}) ->
+ case string:to_integer(binary_to_list(Expiration)) of
+ {error, no_integer} = E ->
+ E;
+ {N, ""} ->
+ case rabbit_misc:check_expiry(N) of
+ ok -> {ok, N};
+ E = {error, _} -> E
+ end;
+ {_, S} ->
+ {error, {leftover_string, S}}
+ end.
+