diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-19 17:14:13 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-19 17:14:13 +0100 |
commit | 11049881a87eb51e9bf6efbb4d2ef1ee4be62bfe (patch) | |
tree | 2a3f21103e1d6050802ed32714d1e62763aeb0a5 /src/rabbit_backing_queue.erl | |
parent | bd1305279e255adcf583afdd55a7cee18a9fcddb (diff) | |
parent | af4ef7640e817141615298c504e9129d14be1d9d (diff) | |
download | rabbitmq-server-bug24969.tar.gz |
Merge defaultbug24969
Diffstat (limited to 'src/rabbit_backing_queue.erl')
-rw-r--r-- | src/rabbit_backing_queue.erl | 121 |
1 files changed, 71 insertions, 50 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index dc144a0e..61b504bc 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -10,8 +10,8 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% -module(rabbit_backing_queue). @@ -22,17 +22,18 @@ -type(ack() :: any()). -type(state() :: any()). +-type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). +-type(drop_result(Ack) :: + ('empty' | {rabbit_types:msg_id(), Ack})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(async_callback() :: + fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | - 'undefined'). +-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues @@ -68,24 +69,29 @@ %% content. -callback delete_and_terminate(any(), state()) -> state(). -%% Remove all messages in the queue, but not messages which have been -%% fetched and are pending acks. +%% Remove all 'fetchable' messages from the queue, i.e. all messages +%% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) -> - state(). + rabbit_types:message_properties(), boolean(), pid(), + state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). --callback publish_delivered(true, rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) - -> {ack(), state()}; - (false, rabbit_types:basic_message(), +-callback publish_delivered(rabbit_types:basic_message(), rabbit_types:message_properties(), pid(), state()) - -> {undefined, state()}. + -> {ack(), state()}. + +%% Called to inform the BQ about messages which have reached the +%% queue, but are not going to be further passed to BQ. +-callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -114,32 +120,51 @@ %% first time the message id appears in the result of %% drain_confirmed. All subsequent appearances of that message id will %% be ignored. --callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. - -%% Drop messages from the head of the queue while the supplied predicate returns -%% true. Also accepts a boolean parameter that determines whether the messages -%% necessitate an ack or not. If they do, the function returns a list of -%% messages with the respective acktags. --callback dropwhile(msg_pred(), true, state()) - -> {[{rabbit_types:basic_message(), ack()}], state()}; - (msg_pred(), false, state()) - -> {undefined, state()}. +-callback drain_confirmed(state()) -> {msg_ids(), state()}. + +%% Drop messages from the head of the queue while the supplied +%% predicate on message properties returns true. Returns the first +%% message properties for which the predictate returned false, or +%% 'undefined' if the whole backing queue was traversed w/o the +%% predicate ever returning false. +-callback dropwhile(msg_pred(), state()) + -> {rabbit_types:message_properties() | undefined, state()}. + +%% Like dropwhile, except messages are fetched in "require +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. +-callback fetchwhile(msg_pred(), msg_fun(A), A, state()) + -> {rabbit_types:message_properties() | undefined, + A, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}. +%% Remove the next message. +-callback drop(true, state()) -> {drop_result(ack()), state()}; + (false, state()) -> {drop_result(undefined), state()}. + %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. --callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. - -%% Acktags supplied are for messages which should be processed. The -%% provided callback function is called with each message. --callback fold(msg_fun(), state(), [ack()]) -> state(). +-callback ack([ack()], state()) -> {msg_ids(), state()}. %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. --callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}. +-callback requeue([ack()], state()) -> {msg_ids(), state()}. + +%% Fold over messages by ack tag. The supplied function is called with +%% each message, its ack tag, and an accumulator. +-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. + +%% Fold over all the messages in a queue and return the accumulated +%% results, leaving the queue undisturbed. +-callback fold(fun((rabbit_types:basic_message(), + rabbit_types:message_properties(), + boolean(), A) -> {('stop' | 'cont'), A}), + A, state()) -> {A, state()}. %% How long is my queue? -callback len(state()) -> non_neg_integer(). @@ -147,6 +172,9 @@ %% Is my queue empty? -callback is_empty(state()) -> boolean(). +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). + %% For the next three functions, the assumption is that you're %% monitoring something like the ingress and egress rates of the %% queue. The RAM duration is thus the length of time represented by @@ -185,18 +213,10 @@ -callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state(). %% Called prior to a publish or publish_delivered call. Allows the BQ -%% to signal that it's already seen this message (and in what capacity -%% - i.e. was it published previously or discarded previously) and -%% thus the message should be dropped. +%% to signal that it's already seen this message, (e.g. it was published +%% or discarded previously) and thus the message should be dropped. -callback is_duplicate(rabbit_types:basic_message(), state()) - -> {'false'|'published'|'discarded', state()}. - -%% Called to inform the BQ about messages which have reached the -%% queue, but are not going to be further passed to BQ for some -%% reason. Note that this is may be invoked for messages for which -%% BQ:is_duplicate/2 has already returned {'published' | 'discarded', -%% BQS}. --callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). + -> {boolean(), state()}. -else. @@ -204,12 +224,13 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, - {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, - {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, - {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, - {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, - {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, + {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {dropwhile, 2}, {fetchwhile, 4}, + {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, + {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, + {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ; behaviour_info(_Other) -> undefined. |