From de9d85c3fa0d73fa51e6754ebfab268e8100e71e Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 3 Apr 2012 14:04:25 +0100 Subject: map_headers instead of {extract, replace}_headers. Sadly the final value of the headers depends on whether we have a DLX routing key or not, so the code in rabbit_amqqueue_process:make_dead_letter_msg/4 is slightly ugly. I still think it's better to keep rabbit_basic:map_headers/2 general. --- Makefile | 2 +- src/rabbit_amqqueue_process.erl | 38 +++++++++++++++++++++----------------- src/rabbit_basic.erl | 20 ++++++++------------ 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/Makefile b/Makefile index d16cd4d0..80eb6a30 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 1 + sleep 5 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c1e4573..b7161a05 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -836,29 +836,33 @@ make_dead_letter_msg(DLX, Reason, exchange_name = Exchange, routing_keys = RoutingKeys}, State = #q{dlx_routing_key = DlxRoutingKey}) -> - Headers = rabbit_basic:extract_headers(Content), - #resource{name = QName} = qname(State), - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], - Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), - {DeathRoutingKeys, Headers2} = + {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of - undefined -> {RoutingKeys, Headers1}; + undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], - lists:keydelete(<<"CC">>, 1, Headers1)} + fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + #resource{name = QName} = qname(State), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = + [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, + longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, + Info, Headers)) end, - Content1 = rabbit_basic:replace_headers(Headers2, Content), + Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), routing_keys = DeathRoutingKeys, content = Content1}. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index a89aa074..1718a420 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, replace_headers/2, delivery/4, header_routes/1]). + map_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -61,10 +61,8 @@ -spec(append_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). --spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). - --spec(replace_headers/2 :: (headers(), rabbit_types:content()) - -> rabbit_types:content()). +-spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) + -> rabbit_types:content()). -spec(header_routes/1 :: (undefined | rabbit_framing:amqp_table()) -> [string()]). @@ -188,14 +186,12 @@ append_table_header(Name, Info, Headers) -> end, rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). -extract_headers(Content) -> - #content{properties = #'P_basic'{headers = Headers}} = - rabbit_binary_parser:ensure_content_decoded(Content), - Headers. - -replace_headers(Headers, Content = #content{properties = Props}) -> +map_headers(F, Content) -> + Content = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content, + Headers1 = F(Headers), rabbit_binary_generator:clear_encoded_content( - Content#content{properties = Props#'P_basic'{headers = Headers}}). + Content#content{properties = Props#'P_basic'{headers = Headers1}}). indexof(L, Element) -> indexof(L, Element, 1). -- cgit v1.2.1 From 05122f0ef6236f58c04fc366d4a75ac60a8850f0 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 3 Apr 2012 14:05:32 +0100 Subject: Pushed a change to the Makefile by mistake. --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 80eb6a30..d16cd4d0 100644 --- a/Makefile +++ b/Makefile @@ -207,7 +207,7 @@ start-background-node: all -rm -f $(RABBITMQ_MNESIA_DIR).pid mkdir -p $(RABBITMQ_MNESIA_DIR) setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" & - sleep 5 + sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) -- cgit v1.2.1 From 1fe82ee9b57658fe2861144da9eb0e219da616e4 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 3 Apr 2012 16:05:24 +0100 Subject: Fix in map_headers. --- src/rabbit_basic.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 1718a420..cc876cb4 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -187,11 +187,11 @@ append_table_header(Name, Info, Headers) -> rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). map_headers(F, Content) -> - Content = rabbit_binary_parser:ensure_content_decoded(Content), - #content{properties = #'P_basic'{headers = Headers} = Props} = Content, + Content1 = rabbit_binary_parser:ensure_content_decoded(Content), + #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, Headers1 = F(Headers), rabbit_binary_generator:clear_encoded_content( - Content#content{properties = Props#'P_basic'{headers = Headers1}}). + Content1#content{properties = Props#'P_basic'{headers = Headers1}}). indexof(L, Element) -> indexof(L, Element, 1). -- cgit v1.2.1 From d8ebe0e0c16c2b4b27ef57800a30cd9aadd3f625 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 3 Apr 2012 17:32:03 +0100 Subject: make dtree:take/3 cope with absent keys --- src/dtree.erl | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/src/dtree.erl b/src/dtree.erl index 265bb340..30fa8794 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -26,9 +26,8 @@ %% %% Entries exists while they have a non-empty secondary key set. The %% 'take' operations return the entries that got removed, i.e. that -%% had no remaining secondary keys. take/3 expects entries to exist -%% with the supplied primary keys and secondary key. take/2 can cope -%% with the supplied secondary key having no entries. +%% had no remaining secondary keys. 'take' ignores missing supplied +%% keys. -module(dtree). @@ -75,17 +74,22 @@ insert(PK, SKs, V, {P, S}) -> end, S, SKs)}. take(PKs, SK, {P, S}) -> - {KVs, P1} = take2(PKs, SK, P), - PKS = gb_sets:difference(gb_trees:get(SK, S), gb_sets:from_list(PKs)), - {KVs, {P1, case gb_sets:is_empty(PKS) of - true -> gb_trees:delete(SK, S); - false -> gb_trees:update(SK, PKS, S) - end}}. + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> TakenPKS = gb_sets:from_list(PKs), + PKSInter = gb_sets:intersection(PKS, TakenPKS), + PKSDiff = gb_sets:difference (PKS, TakenPKS), + {KVs, P1} = take2(PKSInter, SK, P), + {KVs, {P1, case gb_sets:is_empty(PKSDiff) of + true -> gb_trees:delete(SK, S); + false -> gb_trees:update(SK, PKSDiff, S) + end}} + end. take(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; - {value, PKS} -> {KVs, P1} = take2(gb_sets:to_list(PKS), SK, P), + {value, PKS} -> {KVs, P1} = take2(PKS, SK, P), {KVs, {P1, gb_trees:delete(SK, S)}} end. @@ -100,12 +104,13 @@ size({P, _S}) -> gb_trees:size(P). %%---------------------------------------------------------------------------- -take2(PKs, SK, P) -> - lists:foldl(fun (PK, {KVs, P0}) -> - {SKS, V} = gb_trees:get(PK, P0), - SKS1 = gb_sets:delete(SK, SKS), - case gb_sets:is_empty(SKS1) of - true -> {[{PK, V} | KVs], gb_trees:delete(PK, P0)}; - false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} - end - end, {[], P}, PKs). +take2(PKS, SK, P) -> + gb_sets:fold(fun (PK, {KVs, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + SKS1 = gb_sets:delete(SK, SKS), + case gb_sets:is_empty(SKS1) of + true -> KVs1 = [{PK, V} | KVs], + {KVs1, gb_trees:delete(PK, P0)}; + false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} + end + end, {[], P}, PKS). -- cgit v1.2.1 From 363f7a6aad12282b2c87983145c00b0c2f7faad9 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 3 Apr 2012 18:40:40 +0100 Subject: fix nack handling in channel When a queue dies abnormally we nack all messages sent to that queue and still pending confirms. Any confirms coming in for these messages thereafter - from other queues - are ignored. --- src/dtree.erl | 27 ++++++++++++++++++++++++++- src/rabbit_channel.erl | 11 ++++++----- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/dtree.erl b/src/dtree.erl index 30fa8794..66a862cb 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -31,7 +31,7 @@ -module(dtree). --export([empty/0, insert/4, take/3, take/2, +-export([empty/0, insert/4, take/3, take/2, take_all/2, is_defined/2, is_empty/1, smallest/1, size/1]). %%---------------------------------------------------------------------------- @@ -51,6 +51,7 @@ -spec(insert/4 :: (pk(), [sk()], val(), ?MODULE()) -> ?MODULE()). -spec(take/3 :: ([pk()], sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(take/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). +-spec(take_all/2 :: (sk(), ?MODULE()) -> {[kv()], ?MODULE()}). -spec(is_defined/2 :: (sk(), ?MODULE()) -> boolean()). -spec(is_empty/1 :: (?MODULE()) -> boolean()). -spec(smallest/1 :: (?MODULE()) -> kv()). @@ -93,6 +94,13 @@ take(SK, {P, S}) -> {KVs, {P1, gb_trees:delete(SK, S)}} end. +take_all(SK, {P, S}) -> + case gb_trees:lookup(SK, S) of + none -> {[], {P, S}}; + {value, PKS} -> {KVs, SKS, P1} = take_all2(PKS, P), + {KVs, {P1, prune(SKS, PKS, S)}} + end. + is_defined(SK, {_P, S}) -> gb_trees:is_defined(SK, S). is_empty({P, _S}) -> gb_trees:is_empty(P). @@ -114,3 +122,20 @@ take2(PKS, SK, P) -> false -> {KVs, gb_trees:update(PK, {SKS1, V}, P0)} end end, {[], P}, PKS). + +take_all2(PKS, P) -> + gb_sets:fold(fun (PK, {KVs, SKS0, P0}) -> + {SKS, V} = gb_trees:get(PK, P0), + {[{PK, V} | KVs], gb_sets:union(SKS, SKS0), + gb_trees:delete(PK, P0)} + end, {[], gb_sets:empty(), P}, PKS). + +prune(SKS, PKS, S) -> + gb_sets:fold(fun (SK0, S0) -> + PKS1 = gb_trees:get(SK0, S0), + PKS2 = gb_sets:difference(PKS1, PKS), + case gb_sets:is_empty(PKS2) of + true -> gb_trees:delete(SK0, S0); + false -> gb_trees:update(SK0, PKS2, S0) + end + end, S, SKS). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4a0e93be..0c1c11d8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1116,11 +1116,12 @@ monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> end. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(QPid, UC), - (case rabbit_misc:is_abnormal_termination(Reason) of - true -> fun send_nacks/2; - false -> fun record_confirms/2 - end)(MXs, State#ch{unconfirmed = UC1}). + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {MXs, UC1} = dtree:take_all(QPid, UC), + send_nacks(MXs, State#ch{unconfirmed = UC1}); + false -> {MXs, UC1} = dtree:take(QPid, UC), + record_confirms(MXs, State#ch{unconfirmed = UC1}) + end. handle_consuming_queue_down(QPid, State = #ch{consumer_mapping = ConsumerMapping, -- cgit v1.2.1 From 06059ad7b6b55d267aeb43af851a3bced312e70b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 3 Apr 2012 20:15:36 +0100 Subject: revise nack handling in queue (for dlx) - with the changes introduced in bug 24750, the "lost" message count logged was misleading since it would not include any messages still pending confirmation from other queues. Fix that. - normal queue terminations are not log worthy - splitting log information across two entries loses context, i.e. other log entries might appear inbetween. So now we collate all the information into a single log entry. - saying that messages were "lost" is misleading - we simply don't know what happened to them. Make this clear(er). --- src/rabbit_amqqueue_process.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 19e1736a..21e576a6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -757,15 +757,14 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, error -> noreply(State); {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - case (MsgSeqNoAckTags =/= [] andalso - rabbit_misc:is_abnormal_termination(Reason)) of - true -> rabbit_log:warning("Dead queue lost ~p messages~n", - [length(MsgSeqNoAckTags)]); + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + rabbit_log:warning( + "DLQ ~p for ~s died with ~p unconfirmed messages~n", + [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); false -> ok end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), cleanup_after_confirm( [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], State#q{queue_monitors = dict:erase(QPid, QMons), -- cgit v1.2.1 From 88cd0c8f6beb378d7b564ee0c2a1eaf325ae56dd Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 4 Apr 2012 17:45:43 +0100 Subject: better(?) dtree docs --- src/dtree.erl | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/src/dtree.erl b/src/dtree.erl index 66a862cb..473b4283 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -16,18 +16,19 @@ %% A dual-index tree. %% -%% Conceptually, what we want is a map that has two distinct sets of -%% keys (referred to here as primary and secondary, although that -%% shouldn't imply a hierarchy) pointing to one set of -%% values. However, in practice what we'll always want to do is insert -%% a value that's pointed at by (one primary, many secondaries) and -%% remove values that are pointed at by (one secondary, many -%% primaries) or (one secondary, all primaries). Thus the API. +%% Rntries have the following shape: %% -%% Entries exists while they have a non-empty secondary key set. The -%% 'take' operations return the entries that got removed, i.e. that -%% had no remaining secondary keys. 'take' ignores missing supplied -%% keys. +%% +----+--------------------+---+ +%% | PK | SK1, SK2, ..., SKN | V | +%% +----+--------------------+---+ +%% +%% i.e. a primary key, set of secondary keys, and a value. +%% +%% There can be only one entry per primary key, but secondary keys may +%% appear in multiple entries. +%% +%% The set of secondary keys must be non-empty. Or, to put it another +%% way, entries only exist while their secondary key set is non-empty. -module(dtree). @@ -63,6 +64,8 @@ empty() -> {gb_trees:empty(), gb_trees:empty()}. +%% Insert an entry. Fails if there already is an entry with the given +%% primary key. The list of secondary keys should be non-empty. insert(PK, SKs, V, {P, S}) -> {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), lists:foldl(fun (SK, S0) -> @@ -74,6 +77,11 @@ insert(PK, SKs, V, {P, S}) -> end end, S, SKs)}. +%% Remove the given secondary key from the entries of the given +%% primary keys, returning the primary-key/value pairs of any entries +%% that were dropped as the result (i.e. due to their secondary key +%% set becoming empty). It is ok for the given primary keys and/or +%% secondary key to not exist. take(PKs, SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; @@ -87,6 +95,10 @@ take(PKs, SK, {P, S}) -> end}} end. +%% Remove the given secondary key from all entries, returning the +%% primary-key/value pairs of any entries that were dropped as the +%% result (i.e. due to their secondary key set becoming empty). It is +%% ok for the given secondary key to not exist. take(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; @@ -94,6 +106,9 @@ take(SK, {P, S}) -> {KVs, {P1, gb_trees:delete(SK, S)}} end. +%% Drop all entries which contain the given secondary key, returning +%% the primary-key/value pairs of these entries. It is ok for the +%% given secondary key to not exist. take_all(SK, {P, S}) -> case gb_trees:lookup(SK, S) of none -> {[], {P, S}}; -- cgit v1.2.1 From e3ab28cb531122fb8853ae081e8811b85412669c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 4 Apr 2012 18:10:28 +0100 Subject: Whoopsy. --- src/rabbit_mirror_queue_slave.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 98a80a26..0a51bcaa 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -846,7 +846,7 @@ process_instruction({ack, MsgIds}, process_instruction({fold, MsgFun, AckTags}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - BQS1 = BQ:fold(AckTags, MsgFun, BQS), + BQS1 = BQ:fold(MsgFun, BQS, AckTags), {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, -- cgit v1.2.1 From 20cc571cbc15e555420362086315f0035c8efe7b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Apr 2012 17:17:38 +0100 Subject: dtree API cleanup: handle 'insert' w empty secondary key list correctly --- src/dtree.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dtree.erl b/src/dtree.erl index 473b4283..e88ad963 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -65,7 +65,11 @@ empty() -> {gb_trees:empty(), gb_trees:empty()}. %% Insert an entry. Fails if there already is an entry with the given -%% primary key. The list of secondary keys should be non-empty. +%% primary key. +insert(PK, [], V, {P, S}) -> + %% dummy insert to force error if PK exists + gb_trees:insert(PK, {gb_sets:empty(), V}, P), + {P, S}; insert(PK, SKs, V, {P, S}) -> {gb_trees:insert(PK, {gb_sets:from_list(SKs), V}, P), lists:foldl(fun (SK, S0) -> -- cgit v1.2.1 From a3c3c87c0621c8969e68d1b8b78da7c656f21a8f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 5 Apr 2012 18:20:15 +0100 Subject: typo --- src/dtree.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dtree.erl b/src/dtree.erl index e88ad963..67bbbc1b 100644 --- a/src/dtree.erl +++ b/src/dtree.erl @@ -16,7 +16,7 @@ %% A dual-index tree. %% -%% Rntries have the following shape: +%% Entries have the following shape: %% %% +----+--------------------+---+ %% | PK | SK1, SK2, ..., SKN | V | -- cgit v1.2.1 From 478acb61ef5589efa45622a278dc3809d1f2e70f Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Tue, 10 Apr 2012 10:26:13 +0100 Subject: Restore rabbit_basic:extract_headers/1 --- src/rabbit_basic.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index cc876cb4..8ad59016 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,7 +20,7 @@ -export([publish/4, publish/6, publish/1, message/3, message/4, properties/1, append_table_header/3, - map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -61,6 +61,8 @@ -spec(append_table_header/3 :: (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). +-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). + -spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) -> rabbit_types:content()). @@ -186,6 +188,11 @@ append_table_header(Name, Info, Headers) -> end, rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + map_headers(F, Content) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content), #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, -- cgit v1.2.1 From f1719addbe1c0d7e0f9fbcf2ce8445a2841e80c8 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 10 Apr 2012 21:19:46 +0100 Subject: optimisation: gm:broadcast earlier so other nodes can get on with work --- src/rabbit_mirror_queue_master.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index bfdab487..cd9dabc4 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -179,9 +179,9 @@ dropwhile(Pred, MsgFun, backing_queue_state = BQS }) -> Len = BQ:len(BQS), BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. @@ -241,11 +241,11 @@ ack(AckTags, State = #state { gm = GM, backing_queue_state = BQS, ack_msg_id = AM }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -- cgit v1.2.1 From fe8e5bf03098a9393ad0f1db433cf97cea4d2fc2 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 10 Apr 2012 21:26:10 +0100 Subject: cosmetic --- src/rabbit_mirror_queue_master.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index cd9dabc4..5db0fa2f 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -177,10 +177,11 @@ dropwhile(Pred, MsgFun, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> - Len = BQ:len(BQS), + Len = BQ:len(BQS), BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), - ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), - Dropped = Len - BQ:len(BQS1), + Len1 = BQ:len(BQS1), + ok = gm:broadcast(GM, {set_length, Len1}), + Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 }. -- cgit v1.2.1