summaryrefslogtreecommitdiff
path: root/src/rabbit_backing_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_backing_queue.erl')
-rw-r--r--src/rabbit_backing_queue.erl75
1 files changed, 47 insertions, 28 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 5e13bc58..2f247448 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,25 +18,22 @@
-ifdef(use_specs).
--export_type([async_callback/0]).
-
%% We can't specify a per-queue ack/state with callback signatures
-type(ack() :: any()).
-type(state() :: any()).
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
+-type(drop_result(Ack) ::
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
- 'undefined').
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -72,14 +69,18 @@
%% content.
-callback delete_and_terminate(any(), state()) -> state().
-%% Remove all messages in the queue, but not messages which have been
-%% fetched and are pending acks.
+%% Remove all 'fetchable' messages from the queue, i.e. all messages
+%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state().
+ rabbit_types:message_properties(), boolean(), pid(),
+ state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
@@ -124,33 +125,50 @@
%% be ignored.
-callback drain_confirmed(state()) -> {msg_ids(), state()}.
-%% Drop messages from the head of the queue while the supplied predicate returns
-%% true. Also accepts a boolean parameter that determines whether the messages
-%% necessitate an ack or not. If they do, the function returns a list of
-%% messages with the respective acktags.
--callback dropwhile(msg_pred(), true, state())
- -> {rabbit_types:message_properties() | undefined,
- [{rabbit_types:basic_message(), ack()}], state()};
- (msg_pred(), false, state())
- -> {rabbit_types:message_properties() | undefined,
- undefined, state()}.
+%% Drop messages from the head of the queue while the supplied
+%% predicate on message properties returns true. Returns the first
+%% message properties for which the predictate returned false, or
+%% 'undefined' if the whole backing queue was traversed w/o the
+%% predicate ever returning false.
+-callback dropwhile(msg_pred(), state())
+ -> {rabbit_types:message_properties() | undefined, state()}.
+
+%% Like dropwhile, except messages are fetched in "require
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
+-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
+ -> {rabbit_types:message_properties() | undefined,
+ A, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}.
+%% Remove the next message.
+-callback drop(true, state()) -> {drop_result(ack()), state()};
+ (false, state()) -> {drop_result(undefined), state()}.
+
%% Acktags supplied are for messages which can now be forgotten
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
-callback ack([ack()], state()) -> {msg_ids(), state()}.
-%% Acktags supplied are for messages which should be processed. The
-%% provided callback function is called with each message.
--callback fold(msg_fun(), state(), [ack()]) -> state().
-
%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
+%% Fold over messages by ack tag. The supplied function is called with
+%% each message, its ack tag, and an accumulator.
+-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
+
+%% Fold over all the messages in a queue and return the accumulated
+%% results, leaving the queue undisturbed.
+-callback fold(fun((rabbit_types:basic_message(),
+ rabbit_types:message_properties(),
+ boolean(), A) -> {('stop' | 'cont'), A}),
+ A, state()) -> {A, state()}.
+
%% How long is my queue?
-callback len(state()) -> non_neg_integer().
@@ -210,9 +228,10 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 3},
- {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
+ {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {dropwhile, 2}, {fetchwhile, 4},
+ {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
{ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
{handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}] ;