summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-01 17:40:59 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-02-01 17:40:59 +0000
commite0fcc125297062763771da0e5b6208307d7fa62f (patch)
tree5f643c0c368a22e4782977f892f364b1409e0739
parent2ae51cb5ef9251d63f8864ad606b6630141c64c3 (diff)
downloadrabbitmq-server-e0fcc125297062763771da0e5b6208307d7fa62f.tar.gz
Cosmetic, restored reject/4 arguments to the previous order.
-rw-r--r--src/file_handle_cache.erl8
-rw-r--r--src/rabbit_channel.erl15
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_variable_queue.erl33
4 files changed, 40 insertions, 20 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c11fb54b..bbf50d32 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -149,6 +149,7 @@
-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
+-export([needs_sync/1]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/2]).
@@ -373,6 +374,13 @@ sync(Ref) ->
end
end).
+needs_sync(Ref) ->
+ with_flushed_handles(
+ [Ref],
+ fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
+ (_) -> true
+ end).
+
position(Ref, NewOffset) ->
with_flushed_handles(
[Ref],
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d6c9a51c..50e9e49a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -679,7 +679,7 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
requeue = Requeue},
_, State) ->
- reject(DeliveryTag, Multiple, Requeue, State);
+ reject(DeliveryTag, Requeue, Multiple, State);
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -880,7 +880,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
requeue = Requeue},
_, State) ->
- reject(DeliveryTag, false, Requeue, State);
+ reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
@@ -1084,10 +1084,11 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
ack(TAL, State1),
lists:foreach(
@@ -1266,7 +1267,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Multiple, Requeue,
+reject(DeliveryTag, Requeue, Multiple,
State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f03c1d1c..434f28d4 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -22,6 +22,7 @@
next_segment_boundary/1, bounds/1, recover/1]).
-export([add_queue_ttl/0]).
+-export([needs_sync/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -298,6 +299,9 @@ sync(SeqIds, State) ->
%% seqids not being in the journal.
sync_if([] =/= SeqIds, State).
+needs_sync(#qistate { journal_handle = JournalHdl }) ->
+ file_handle_cache:needs_sync(JournalHdl).
+
flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9b45b558..8407bebf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -731,21 +731,28 @@ ram_duration(State = #vqstate {
ram_msg_count_prev = RamMsgCount,
ram_ack_count_prev = RamAckCount }}.
-needs_timeout(State) ->
- case needs_index_sync(State) of
- false -> case reduce_memory_use(
- fun (_Quota, State1) -> {0, State1} end,
- fun (_Quota, State1) -> State1 end,
- fun (_Quota, State1) -> {0, State1} end,
- State) of
- {true, _State} -> idle;
- {false, _State} -> false
- end;
- true -> timed
+needs_timeout(State = #vqstate { index_state = IndexState }) ->
+ case rabbit_queue_index:needs_sync(IndexState) of
+ true ->
+ timed;
+ false ->
+ case needs_index_sync(State) of
+ false -> case reduce_memory_use(
+ fun (_Quota, State1) -> {0, State1} end,
+ fun (_Quota, State1) -> State1 end,
+ fun (_Quota, State1) -> {0, State1} end,
+ State) of
+ {true, _State} -> idle;
+ {false, _State} -> false
+ end;
+ true -> timed
+ end
end.
-timeout(State) ->
- a(reduce_memory_use(confirm_commit_index(State))).
+timeout(State = #vqstate { index_state = IndexState }) ->
+ State1 = State #vqstate {
+ index_state = rabbit_queue_index:sync(IndexState) },
+ a(reduce_memory_use(State1)).
handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.