From 6a533c6fef3a15e07b97563c2e978e9e3e9d2a84 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 18 Aug 2010 14:46:38 +0100 Subject: added validation for x-message-ttl arg on queue declare --- src/rabbit_amqqueue.erl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2453280e..20f53ed4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -57,6 +57,7 @@ -include_lib("stdlib/include/qlc.hrl"). -define(EXPIRES_TYPE, long). +-define(TTL_TYPE, long). %%---------------------------------------------------------------------------- @@ -307,7 +308,8 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(undefined) -> @@ -320,6 +322,16 @@ check_expires_argument({?EXPIRES_TYPE, _Expires}) -> check_expires_argument(_) -> {error, expires_not_of_type_long}. +check_message_ttl_argument(undefined) -> + ok; +check_message_ttl_argument({?TTL_TYPE, TTL}) + when is_integer(TTL) andalso TTL > 0 -> + ok; +check_message_ttl_argument({?TTL_TYPE, _TTL}) -> + {error, ttl_zero_or_less}; +check_message_ttl_argument(_) -> + {error, ttl_not_of_type_long}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, -- cgit v1.2.1 From ff44ab3ca3312487474e8c076859319f367e22ce Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 18 Aug 2010 14:49:48 +0100 Subject: added msg_properties record for rabbit.hrl --- include/rabbit.hrl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index b9abd788..baa2d721 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,6 +74,8 @@ -record(event, {type, props, timestamp}). +-record(msg_properties, {ttl}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -- cgit v1.2.1 From 8b308948d95ab54c14d48a2e437f52d430865ba5 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 18 Aug 2010 23:10:13 +0100 Subject: reworked backing_queue and variable_queue to store msg_properties. used msg properties for storing and manipulating message expiry --- include/rabbit.hrl | 2 +- src/rabbit_amqqueue_process.erl | 42 ++++++++++++++++++++++++----- src/rabbit_backing_queue.erl | 8 +++--- src/rabbit_variable_queue.erl | 60 +++++++++++++++++++++++------------------ 4 files changed, 75 insertions(+), 37 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index baa2d721..be0d4b82 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,7 +74,7 @@ -record(event, {type, props, timestamp}). --record(msg_properties, {ttl}). +-record(msg_properties, {expiry}). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5..2cee51f7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -60,7 +62,8 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl }). -record(consumer, {tag, ack_required}). @@ -122,6 +125,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -144,12 +148,21 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_queue_state(State) -> + init_expires(init_ttl(State)). + init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. +init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of + {long, Ttl} -> State#q{ttl=Ttl}; + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> @@ -167,7 +180,7 @@ declare(Recover, From, queue_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), - noreply(init_expires(State#q{backing_queue_state = BQS})); + noreply(init_queue_state(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -418,7 +431,8 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + MsgProperties = new_msg_properties(State), + {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -426,13 +440,15 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), + MsgProperties = new_msg_properties(State), + BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> + MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -530,7 +546,7 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -549,6 +565,20 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +reset_msg_expiry_fun(State) -> + fun(MsgProps) -> + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + end. + +new_msg_properties(State) -> + #msg_properties{expiry = calculate_msg_expiry(State)}. + +calculate_msg_expiry(_State = #q{ttl = undefined}) -> + undefined; +calculate_msg_expiry(_State = #q{ttl = Ttl}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now - (Ttl * 1000). + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 2230c507..cc7f8571 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -62,7 +62,7 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 2}, + {publish, 3}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 3}, + {tx_publish, 4}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -89,11 +89,11 @@ behaviour_info(callbacks) -> %% Commit a transaction. The Fun passed in must be called once %% the messages have really been commited. This CPS permits the %% possibility of commit coalescing. - {tx_commit, 3}, + {tx_commit, 4}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. - {requeue, 2}, + {requeue, 3}, %% How long is my queue? {len, 1}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee8..e0bc75b7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,9 +32,9 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/2, publish_delivered/3, fetch/2, ack/2, - tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, - requeue/2, len/1, is_empty/1, + purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, + tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -248,7 +248,8 @@ is_persistent, is_delivered, msg_on_disk, - index_on_disk + index_on_disk, + msg_properties }). -record(delta, @@ -490,8 +491,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> ram_index_count = 0, persistent_count = 0 })}. -publish(Msg, State) -> - {_SeqId, State1} = publish(Msg, false, false, State), +publish(Msg, MsgProperties, State) -> + {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> @@ -505,7 +506,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{})) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), @@ -579,12 +580,13 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProperties} | Pubs] }), a(case IsPersistent andalso IsDurable of - true -> MsgStatus = msg_status(true, undefined, Msg), + true -> MsgStatus = msg_status(true, undefined, Msg, MsgProperties), {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(false, MsgStatus, MSCState), State #vqstate { msg_store_clients = MSCState1 }; @@ -606,7 +608,7 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), PubsOrdered = lists:reverse(Pubs), @@ -624,18 +626,21 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) end)}. -requeue(AckTags, State) -> +requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg }, State1) -> - {_SeqId, State2} = publish(Msg, true, false, State1), + fun (#msg_status { msg = Msg, + msg_properties = MsgProperties }, State1) -> + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; - ({IsPersistent, Guid}, State1) -> + ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, true, true, State2), + {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), + true, true, State2), State3 end, AckTags, State))). @@ -783,10 +788,11 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, - msg_on_disk = false, index_on_disk = false }. + msg_on_disk = false, index_on_disk = false, + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -914,7 +920,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, _Guid} -> IsPersistent + {IsPersistent, _Guid, _MsgProperties} -> IsPersistent end]; false -> [] end, @@ -946,10 +952,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun (Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, false, IsPersistent1, State2), + MsgProperties1 = MsgProperties, + {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -996,7 +1003,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1005,8 +1012,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, durable = IsDurable, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) - #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) + #msg_status { is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1073,9 +1081,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid}; + true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus end, dict:store(SeqId, AckEntry, PA). @@ -1128,7 +1136,7 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. -- cgit v1.2.1 From c144090eaa7463dffc0e3a647414f8bc6b12425c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 19 Aug 2010 10:51:54 +0100 Subject: per-queue ttl working for non-transactionl messages without persistence --- src/rabbit_amqqueue_process.erl | 37 +++++++++++++++++++++++++++---------- src/rabbit_variable_queue.erl | 9 +++++---- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2cee51f7..abd09a63 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -63,7 +63,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - ttl + ttl }). -record(consumer, {tag, ack_required}). @@ -406,7 +406,7 @@ deliver_from_queue_pred(IsEmpty, _State) -> deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = + {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. @@ -440,7 +440,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - MsgProperties = new_msg_properties(State), + MsgProperties = new_msg_properties(State), BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -577,7 +577,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), - Now - (Ttl * 1000). + Now + (Ttl * 1000). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -621,6 +621,24 @@ emit_stats(State) -> [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). %--------------------------------------------------------------------------- +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. + +msg_expired(MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> @@ -699,13 +717,12 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, State1) of + {empty, State2} -> reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), store_ch_record( @@ -713,7 +730,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e0bc75b7..591052a6 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -249,7 +249,7 @@ is_delivered, msg_on_disk, index_on_disk, - msg_properties + msg_properties }). -record(delta, @@ -533,7 +533,8 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, {{value, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, + msg_properties = MsgProperties }}, Q4a} -> %% 1. Mark it delivered if necessary @@ -564,7 +565,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, a(State #vqstate { q4 = Q4a, ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, @@ -580,7 +581,7 @@ ack(AckTags, State) -> AckTags, State)). tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, + MsgProperties, State = #vqstate { durable = IsDurable, msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), -- cgit v1.2.1 From 435a267e5614817efda1631b1212702da244191d Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 19 Aug 2010 11:21:00 +0100 Subject: Expiry is reset on tx commit now. TX publish messages have correct TTL semantics --- src/rabbit_amqqueue_process.erl | 37 +++++++++++++++++++------------------ src/rabbit_variable_queue.erl | 9 ++++++--- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index abd09a63..2810a156 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -450,6 +450,19 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; + {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end + end. + add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -565,6 +578,12 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> + false; +msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now > Expiry. + reset_msg_expiry_fun(State) -> fun(MsgProps) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} @@ -621,24 +640,6 @@ emit_stats(State) -> [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). %--------------------------------------------------------------------------- -fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} = Result -> {empty, State#q{backing_queue_state = BQS1}}; - {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} - end - end. - -msg_expired(MsgProperties = #msg_properties{expiry = undefined}) -> - false; -msg_expired(MsgProperties = #msg_properties{expiry=Expiry}) -> - Now = timer:now_diff(now(), {0,0,0}), - Now > Expiry. handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 591052a6..4e978fd5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -612,7 +612,11 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:reverse(Pubs), + F = fun({Msg, MsgProperties}) -> + {Msg, MsgPropsFun(MsgProperties)} + end, + PubsProcessed = lists:map(F, Pubs), + PubsOrdered = lists:reverse(PubsProcessed), AckTags1 = lists:append(AckTags), PersistentGuids = persistent_guids(PubsOrdered), HasPersistentPubs = PersistentGuids =/= [], @@ -956,8 +960,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgProperties1 = MsgProperties, - {SeqId, State3} = publish(Msg, MsgProperties1, false, IsPersistent1, State2), + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), -- cgit v1.2.1 From 005a712e726f1e9a48dbcf0ef2721b3976672ae5 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 20 Aug 2010 11:28:57 +0100 Subject: in flight, want to test against default --- include/rabbit.hrl | 2 + src/rabbit_amqqueue_process.erl | 1 + src/rabbit_queue_index.erl | 83 ++++++++++++++++++++++++++++------------- src/rabbit_variable_queue.erl | 15 ++++++-- 4 files changed, 73 insertions(+), 28 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index be0d4b82..82c60e93 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -76,6 +76,8 @@ -record(msg_properties, {expiry}). +-record(qientry, {guid, seq_id, expiry}). + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6f898ce..f1bfba84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,6 +720,7 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> + io:format("Basic Get~n"), AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d6b8bb28..3c3aef3e 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/3, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -141,14 +141,19 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). +-define(EXPIRY_BYTES, 8). +-define(EXPIRY_BITS, (?EXPIRY_BYTES * 8)). +-define(NO_EXPIRY, 0). + -define(GUID_BYTES, 16). %% md5sum is 128 bit or 16 bytes -define(GUID_BITS, (?GUID_BYTES * 8)). -%% 16 bytes for md5sum + 2 for seq, bits and prefix --define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + 2). +%% 16 bytes for md5sum + 8 for expiry + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?GUID_BYTES + ?EXPIRY_BYTES + 2). %% 1 publish, 1 deliver, 1 ack per msg -define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * @@ -157,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _}). %% {Guid, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent} -define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). @@ -245,15 +250,18 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> +publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, + IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). + end):?JPREFIX_BITS, + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, Expiry)]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -282,9 +290,10 @@ flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], State}; -read(Start, End, State = #qistate { segments = Segments, +read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. + io:format("Reading~n"), LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), {Messages, Segments1} = @@ -320,6 +329,7 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> + io:format("Recovering~n"), DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -386,6 +396,7 @@ store_clean_shutdown(Terms, Dir) -> rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). init_clean(RecoveredCounts, State) -> + io:format("Clean Init"), %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), @@ -402,9 +413,11 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return + io:format("Clean:~p~n", [Segments1]), {undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> + io:format("Dirty Init"), %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -427,6 +440,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Unconditionally flush since the dirty_count doesn't get updated %% by the above foldl. State2 = flush_journal(State1 #qistate { segments = Segments1 }), + io:format("Dirty:~p~n", [State2]), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -449,7 +463,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -511,6 +525,26 @@ queue_index_walker_reader(QueueName, Gatherer) -> {_SegmentCounts, _State} = terminate(State), ok = gatherer:finish(Gatherer). +%%---------------------------------------------------------------------------- +%% expiry/binary manipulation +%%---------------------------------------------------------------------------- + +create_pub_record_body(Guid, Expiry) -> + io:format("Writing GUID: ~p~n", [Guid]), + [Guid, expiry_to_binary(Expiry)]. + +expiry_to_binary(undefined) -> + <>; +expiry_to_binary(Expiry) -> + <>. + +read_pub_record_body(Hdl) -> + {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), + <> = Bin, + <> = <>, + io:format("Read GUID: ~p~n", [Guid]), + {Guid, Expiry}. + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -588,6 +622,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), + io:format("Journal Path:~p~n", [Path]), {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -631,14 +666,10 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <>} -> - %% work around for binary data - %% fragmentation. See - %% rabbit_msg_file:read_next/2 - <> = - <>, - Publish = {Guid, case Prefix of + io:format("found a pub record"), + case read_pub_record_body(Hdl) of + {Guid, Expiry} -> + Publish = {Guid, Expiry, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -739,11 +770,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, IsPersistent} -> + {Guid, Expiry, IsPersistent} -> file_handle_cache:append( Hdl, [<>, Guid]) + RelSeq:?REL_SEQ_BITS>>, + create_pub_record_body(Guid, Expiry)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -763,10 +795,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq), IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -789,6 +821,7 @@ load_segment(KeepAcked, #segment { path = Path }) -> {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), + io:format("Load segment: ~p~n", [Res]), Res end. @@ -798,8 +831,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), - Obj = {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, Expiry} = read_pub_record_body(Hdl), + Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4e978fd5..ff19b87f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1057,12 +1057,14 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, seq_id = SeqId, + seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, IndexState) + is_delivered = IsDelivered }, + IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, SeqId, IsPersistent, + IndexEntry = queue_index_entry(MsgStatus), + IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; @@ -1079,6 +1081,12 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, {MsgStatus2, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }}. +queue_index_entry(#msg_status {guid = Guid, + seq_id = SeqId, + msg_properties = + #msg_properties{expiry = Expiry}}) -> + #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}. + %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- @@ -1300,6 +1308,7 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), + io:format("Reading~n"), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries( -- cgit v1.2.1 From 45ac5a0081789c79882542c6124f6b3133ce468a Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 20 Aug 2010 13:25:31 +0100 Subject: reworked contract between qi and vq. tidied up in general --- include/rabbit.hrl | 2 -- src/rabbit_amqqueue_process.erl | 1 - src/rabbit_queue_index.erl | 55 +++++++++++++++++------------------------ src/rabbit_variable_queue.erl | 22 ++++++++--------- 4 files changed, 32 insertions(+), 48 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 82c60e93..be0d4b82 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -76,8 +76,6 @@ -record(msg_properties, {expiry}). --record(qientry, {guid, seq_id, expiry}). - %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f1bfba84..b6f898ce 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,7 +720,6 @@ handle_call({notify_down, ChPid}, _From, State) -> handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> - io:format("Basic Get~n"), AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3c3aef3e..4f2168d1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/4, terminate/2, delete_and_terminate/1, publish/3, +-export([init/4, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -98,7 +98,7 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{IndexEntry, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -162,7 +162,7 @@ %% ---- misc ---- --define(PUB, {_, _, _}). %% {Guid, Expiry, IsPersistent} +-define(PUB, {_, _, _}). %% {Guid, MsgProperties, IsPersistent} -define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). @@ -203,15 +203,15 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/4 :: (rabbit_guid:guid(), seq_id(), boolean(), qistate()) -> - qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), boolean(), boolean()}], - qistate()}). + {[{rabbit_guid:guid(), seq_id(), msg_properties(), + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -250,8 +250,8 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, - IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProperties, IsPersistent, State) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( @@ -260,8 +260,8 @@ publish(#qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}, false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, Expiry)]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, Expiry, IsPersistent}, State1)). + create_pub_record_body(Guid, MsgProperties)]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -293,7 +293,6 @@ read(StartEnd, StartEnd, State) -> read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. - io:format("Reading~n"), LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), {Messages, Segments1} = @@ -329,7 +328,6 @@ bounds(State = #qistate { segments = Segments }) -> {LowSeqId, NextSeqId, State}. recover(DurableQueues) -> - io:format("Recovering~n"), DurableDict = dict:from_list([ {queue_name_to_dir_name(Queue), Queue} || Queue <- DurableQueues ]), QueuesDir = queues_dir(), @@ -396,7 +394,6 @@ store_clean_shutdown(Terms, Dir) -> rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). init_clean(RecoveredCounts, State) -> - io:format("Clean Init"), %% Load the journal. Since this is a clean recovery this (almost) %% gets us back to where we were on shutdown. State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), @@ -413,11 +410,9 @@ init_clean(RecoveredCounts, State) -> end, Segments, RecoveredCounts), %% the counts above include transient messages, which would be the %% wrong thing to return - io:format("Clean:~p~n", [Segments1]), {undefined, State1 # qistate { segments = Segments1 }}. init_dirty(CleanShutdown, ContainsCheckFun, State) -> - io:format("Dirty Init"), %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -440,7 +435,6 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Unconditionally flush since the dirty_count doesn't get updated %% by the above foldl. State2 = flush_journal(State1 #qistate { segments = Segments1 }), - io:format("Dirty:~p~n", [State2]), {Count, State2}. terminate(State = #qistate { journal_handle = JournalHdl, @@ -463,7 +457,7 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _Expiry, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -529,8 +523,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, Expiry) -> - io:format("Writing GUID: ~p~n", [Guid]), +create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> [Guid, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> @@ -542,8 +535,7 @@ read_pub_record_body(Hdl) -> {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), <> = Bin, <> = <>, - io:format("Read GUID: ~p~n", [Guid]), - {Guid, Expiry}. + {Guid, #msg_properties{expiry = Expiry}}. %%---------------------------------------------------------------------------- %% journal manipulation @@ -622,7 +614,6 @@ append_journal_to_segment(#segment { journal_entries = JEntries, get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - io:format("Journal Path:~p~n", [Path]), {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -666,10 +657,9 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> ?ACK_JPREFIX -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> - io:format("found a pub record"), case read_pub_record_body(Hdl) of - {Guid, Expiry} -> - Publish = {Guid, Expiry, case Prefix of + {Guid, MsgProperties} -> + Publish = {Guid, MsgProperties, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -770,12 +760,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {Guid, Expiry, IsPersistent} -> + {Guid, MsgProperties, IsPersistent} -> file_handle_cache:append( Hdl, [<>, - create_pub_record_body(Guid, Expiry)]) + create_pub_record_body(Guid, MsgProperties)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -795,10 +785,10 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, Expiry, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, Expiry, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, MsgProperties, reconstruct_seq_id(StartSeg, RelSeq), IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc @@ -821,7 +811,6 @@ load_segment(KeepAcked, #segment { path = Path }) -> {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), ok = file_handle_cache:close(Hdl), - io:format("Load segment: ~p~n", [Res]), Res end. @@ -831,8 +820,8 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {Guid, Expiry} = read_pub_record_body(Hdl), - Obj = {{Guid, Expiry, 1 == IsPersistentNum}, no_del, no_ack}, + {Guid, MsgProperties} = read_pub_record_body(Hdl), + Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, UnackedCount + 1); diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff19b87f..e6ff18c9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -837,7 +837,7 @@ persistent_guids(Pubs) -> betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, MsgProperties, SeqId, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -849,7 +849,8 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = true + index_on_disk = true, + msg_properties = MsgProperties }) | Filtered1], Delivers1, Acks1} @@ -1057,14 +1058,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - is_delivered = IsDelivered }, + is_delivered = IsDelivered, + msg_properties = MsgProperties}, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexEntry = queue_index_entry(MsgStatus), - IndexState1 = rabbit_queue_index:publish(IndexEntry, IsPersistent, + IndexState1 = rabbit_queue_index:publish(Guid, + SeqId, + MsgProperties, + IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; @@ -1081,12 +1086,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, {MsgStatus2, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }}. -queue_index_entry(#msg_status {guid = Guid, - seq_id = SeqId, - msg_properties = - #msg_properties{expiry = Expiry}}) -> - #qientry{guid = Guid, seq_id = SeqId, expiry = Expiry}. - %%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- @@ -1308,7 +1307,6 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - io:format("Reading~n"), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries( -- cgit v1.2.1 From f3222f2789a3ebcd2bdc735e546f1404a30b933d Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 20 Aug 2010 14:16:05 +0100 Subject: support for expiration during consume now --- src/rabbit_amqqueue_process.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6f898ce..8c0bc8b4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -406,10 +406,9 @@ deliver_from_queue_pred(IsEmpty, _State) -> deliver_from_queue_deliver(AckRequired, false, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {{Message, _MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. + {{Message, IsDelivered, AckTag, Remaining}, State1} = + fetch(AckRequired, State), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, -- cgit v1.2.1 From c13b29f5527fc8c48336a7ea82f30022d42559d5 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 15 Sep 2010 10:47:57 +0100 Subject: erlang tests working again --- src/rabbit_queue_index.erl | 2 +- src/rabbit_tests.erl | 7 ++++--- src/rabbit_variable_queue.erl | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 4f2168d1..c631f7a2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -788,7 +788,7 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> - [ {Guid, MsgProperties, reconstruct_seq_id(StartSeg, RelSeq), + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d953edb5..6e9fd8c8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1651,7 +1651,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, Persistent, Delivered}|Read], + [{Guid, SeqId, _Props, Persistent, Delivered}|Read], [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> @@ -1786,11 +1786,12 @@ variable_queue_publish(IsPersistent, Count, VQ) -> fun (_N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), + rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + #msg_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7f6b291f..aee8d47b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -837,7 +837,7 @@ persistent_guids(Pubs) -> betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( - fun ({Guid, MsgProperties, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, MsgProperties, IsPersistent, IsDelivered}, {Filtered1, Delivers1, Acks1}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, @@ -850,7 +850,7 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = true, - msg_properties = MsgProperties + msg_properties = MsgProperties }) | Filtered1], Delivers1, Acks1} -- cgit v1.2.1 From 64b01c8f192ca92dd2c5d31697f0b0ffc41d79b2 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 15 Sep 2010 15:25:30 +0100 Subject: midway through fixing test_queue_recover --- src/rabbit_queue_index.erl | 10 ++++++++-- src/rabbit_tests.erl | 29 ++++++++++++++++++++++++----- src/rabbit_variable_queue.erl | 3 +++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c631f7a2..1b9cd382 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -427,6 +427,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% find the number of unacked messages. lists:foldl( fun (Seg, {Segments2, CountAcc}) -> + io:format("Recovering: ~p~n", [Seg]), Segment = #segment { unacked = UnackedCount } = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), @@ -584,6 +585,7 @@ maybe_flush_journal(State) -> State. flush_journal(State = #qistate { segments = Segments }) -> + io:format("Flushing journal~n"), Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -709,7 +711,8 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of {ok, Segment} -> Segment; - error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, + error -> io:format("New Seg~n"), + SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), #segment { num = Seg, path = Path, @@ -805,8 +808,10 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> + io:format("path: ~p~n", [Path]), case filelib:is_file(Path) of - false -> {array_new(), 0}; + false -> io:format("Creating new~n"), + {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), @@ -815,6 +820,7 @@ load_segment(KeepAcked, #segment { path = Path }) -> end. load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> + io:format("Loading seg entries"), case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <>} -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 6e9fd8c8..79792786 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,6 +1414,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1657,6 +1658,21 @@ verify_read_with_published(Delivered, Persistent, verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #msg_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1798,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + _Props, IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1838,6 +1854,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1860,7 +1877,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -1924,7 +1941,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = + {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), @@ -1936,7 +1953,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), @@ -1954,6 +1971,7 @@ test_queue_recover() -> sender = self(), message = Msg}, [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)], + io:format("Calling commit~n"), rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -1961,6 +1979,7 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), + io:format("Restarting queue~n"), ok = rabbit_amqqueue:start(), rabbit_amqqueue:with_or_die( QName, @@ -1970,7 +1989,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index aee8d47b..c8d3c6e4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -370,6 +370,7 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + io:format("Initing ~p ~p~n", [QueueName, Recover]), {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -377,6 +378,7 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end), + io:format("Inited: ~p~n", [DeltaCount]), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -956,6 +958,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), + io:format("Committing: ~p~n", [length(Pubs)]), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, -- cgit v1.2.1 From 5b44c9dfb41aa12343bc573ad6ddf53c9568d4d0 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 15 Sep 2010 21:35:57 +0100 Subject: removed stray io:format calls --- src/rabbit_queue_index.erl | 10 ++-------- src/rabbit_variable_queue.erl | 3 --- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1b9cd382..c631f7a2 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -427,7 +427,6 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% find the number of unacked messages. lists:foldl( fun (Seg, {Segments2, CountAcc}) -> - io:format("Recovering: ~p~n", [Seg]), Segment = #segment { unacked = UnackedCount } = recover_segment(ContainsCheckFun, CleanShutdown, segment_find_or_new(Seg, Dir, Segments2)), @@ -585,7 +584,6 @@ maybe_flush_journal(State) -> State. flush_journal(State = #qistate { segments = Segments }) -> - io:format("Flushing journal~n"), Segments1 = segment_fold( fun (#segment { unacked = 0, path = Path }, SegmentsN) -> @@ -711,8 +709,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of {ok, Segment} -> Segment; - error -> io:format("New Seg~n"), - SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, + error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), #segment { num = Seg, path = Path, @@ -808,10 +805,8 @@ segment_entries_foldr(Fun, Init, %% %% Does not do any combining with the journal at all. load_segment(KeepAcked, #segment { path = Path }) -> - io:format("path: ~p~n", [Path]), case filelib:is_file(Path) of - false -> io:format("Creating new~n"), - {array_new(), 0}; + false -> {array_new(), 0}; true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), @@ -820,7 +815,6 @@ load_segment(KeepAcked, #segment { path = Path }) -> end. load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> - io:format("Loading seg entries"), case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <>} -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c8d3c6e4..aee8d47b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -370,7 +370,6 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> - io:format("Initing ~p ~p~n", [QueueName, Recover]), {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -378,7 +377,6 @@ init(QueueName, IsDurable, Recover) -> fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end), - io:format("Inited: ~p~n", [DeltaCount]), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), {PRef, TRef, Terms1} = @@ -958,7 +956,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync { PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), Pubs = lists:append(lists:reverse(SPubs)), - io:format("Committing: ~p~n", [length(Pubs)]), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, -- cgit v1.2.1 From c01e0e4c2a66a9b56b9daeeee2480bf11fc8706a Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 16 Sep 2010 00:14:04 +0100 Subject: whitespace clean up --- src/rabbit_queue_index.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c631f7a2..d0b5d31a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -765,7 +765,7 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> Hdl, [<>, - create_pub_record_body(Guid, MsgProperties)]) + create_pub_record_body(Guid, MsgProperties)]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -820,7 +820,7 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {Guid, MsgProperties} = read_pub_record_body(Hdl), + {Guid, MsgProperties} = read_pub_record_body(Hdl), Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), load_segment_entries(KeepAcked, Hdl, SegEntries1, -- cgit v1.2.1 From 8d6329dea7d928c4913f2e6ad7ac532033b1d440 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 16 Sep 2010 11:43:06 +0100 Subject: fixed bug in persistent_guids vq --- src/rabbit_variable_queue.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index aee8d47b..15167af0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -613,8 +613,8 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), F = fun({Msg, MsgProperties}) -> - {Msg, MsgPropsFun(MsgProperties)} - end, + {Msg, MsgPropsFun(MsgProperties)} + end, PubsProcessed = lists:map(F, Pubs), PubsOrdered = lists:reverse(PubsProcessed), AckTags1 = lists:append(AckTags), @@ -832,7 +832,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. + [Guid || + {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = -- cgit v1.2.1 From defe09a04be19bf532c404d5adca407af9cfcfa2 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 16 Sep 2010 12:49:46 +0100 Subject: fixed test_queue_recover. recovery is working again with per-queue ttl --- src/rabbit_queue_index.erl | 8 ++++++-- src/rabbit_tests.erl | 2 -- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d0b5d31a..b5e92dca 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -510,7 +510,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -535,7 +535,11 @@ read_pub_record_body(Hdl) -> {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), <> = Bin, <> = <>, - {Guid, #msg_properties{expiry = Expiry}}. + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {Guid, #msg_properties{expiry = Exp}}. %%---------------------------------------------------------------------------- %% journal manipulation diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 79792786..805e89f8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1971,7 +1971,6 @@ test_queue_recover() -> sender = self(), message = Msg}, [true = rabbit_amqqueue:deliver(QPid, Delivery) || _ <- lists:seq(1, Count)], - io:format("Calling commit~n"), rabbit_amqqueue:commit_all([QPid], TxID, self()), exit(QPid, kill), MRef = erlang:monitor(process, QPid), @@ -1979,7 +1978,6 @@ test_queue_recover() -> after 10000 -> exit(timeout_waiting_for_queue_death) end, rabbit_amqqueue:stop(), - io:format("Restarting queue~n"), ok = rabbit_amqqueue:start(), rabbit_amqqueue:with_or_die( QName, -- cgit v1.2.1 From 38b735411ae58bb1f0e767f5d41e433b1432c34d Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 17 Sep 2010 11:04:39 +0100 Subject: some cosmetic changes and the beginnings of invariable_queue with new api --- src/rabbit_amqqueue_process.erl | 4 +--- src/rabbit_invariable_queue.erl | 22 +++++++++++++--------- src/rabbit_tests.erl | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8d5699be..1ea9d181 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -402,9 +402,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4e0dad84..44489bf9 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -31,7 +31,7 @@ -module(rabbit_invariable_queue). --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/2, +-export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, @@ -89,9 +89,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _IsDelivered}, Acc) -> + fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, + Acc) -> Acc; - ({Msg = #basic_message { guid = Guid }, IsDelivered}, + ({Msg = #basic_message { guid = Guid }, _MsgProps, IsDelivered}, {AckTagsN, PAN}) -> ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} @@ -99,10 +100,13 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), - State #iv_state { queue = queue:in({Msg, false}, Q), len = Len + 1 }. + QueueItem = {Msg, MsgProps, false}, + State #iv_state { queue = queue:in(QueueItem, Q), len = Len + 1 }. publish_delivered(false, _Msg, State) -> {blank_ack, State}; @@ -118,8 +122,8 @@ fetch(_AckRequired, State = #iv_state { len = 0 }) -> fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, durable = IsDurable, pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, IsDelivered}}, Q1} = - queue:out(Q), + {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, + Q1} = queue:out(Q), Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = dict:store(Guid, Msg, PA), @@ -129,7 +133,7 @@ fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, [Guid], PA1), {blank_ack, PA} end, - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, MsgProps, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 805e89f8..fdead8f9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1414,7 +1414,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), - passed = test_queue_index_props(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, -- cgit v1.2.1 From 11568e8447b4ff39861b730896800c942c469498 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 17 Sep 2010 12:57:10 +0100 Subject: added msg_properties to publish_delivered so they can be rehydrated later --- src/rabbit_backing_queue.erl | 2 +- src/rabbit_variable_queue.erl | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index cc7f8571..5cb78368 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -67,7 +67,7 @@ behaviour_info(callbacks) -> %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 3}, + {publish_delivered, 4}, %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 15167af0..ddcf958f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/3, fetch/2, ack/2, + purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, @@ -495,9 +495,10 @@ publish(Msg, MsgProperties, State) -> {_SeqId, State1} = publish(Msg, MsgProperties, false, false, State), a(reduce_memory_use(State1)). -publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> +publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, + MsgProps, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -506,7 +507,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA, durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, #msg_properties{})) + MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), -- cgit v1.2.1 From 810233739b8043ad61c8f5b6a864ff92766ac16d Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 17 Sep 2010 13:42:41 +0100 Subject: invariable queue updated for backing queue api --- src/rabbit_invariable_queue.erl | 47 ++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 44489bf9..3eea7bec 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/3, fetch/2, ack/2, tx_publish/3, tx_ack/3, - tx_rollback/2, tx_commit/3, requeue/2, len/1, is_empty/1, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -92,10 +92,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, Acc) -> Acc; - ({Msg = #basic_message { guid = Guid }, _MsgProps, IsDelivered}, + ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, {AckTagsN, PAN}) -> ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], dict:store(Guid, Msg, PAN)} + {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} end, {[], dict:new()}, Q), ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. @@ -105,17 +105,18 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, durable = IsDurable, len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), - QueueItem = {Msg, MsgProps, false}, - State #iv_state { queue = queue:in(QueueItem, Q), len = Len + 1 }. + Q1 = enqueue(Msg, MsgProps, false, Q), + State #iv_state { queue = Q1, len = Len + 1 }. -publish_delivered(false, _Msg, State) -> +publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, + MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = dict:store(Guid, Msg, PA) }}. + {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; @@ -126,7 +127,7 @@ fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, Q1} = queue:out(Q), Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = dict:store(Guid, Msg, PA), + PA1 = store_ack(Msg, MsgProps, PA), {AckTag, PA2} = case AckRequired of true -> {Guid, PA1}; false -> ok = persist_acks(QName, IsDurable, none, @@ -142,10 +143,10 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, State = #iv_state { qname = QName, +tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [Msg | Pubs] }), + store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), ok = persist_message(QName, IsDurable, Txn, Msg), State. @@ -163,7 +164,8 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, +tx_commit(Txn, Fun, MsgPropsFun, + State = #iv_state { qname = QName, pending_ack = PA, queue = Q, len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, @@ -172,12 +174,14 @@ tx_commit(Txn, Fun, State = #iv_state { qname = QName, pending_ack = PA, Fun(), AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun (Msg, {QN, LenN}) -> - {queue:in({Msg, false}, QN), LenN + 1} + {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> + MsgProps1 = MsgPropsFun(MsgProps), + QN = enqueue(Msg, MsgProps1, false, Q), + {QN, LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, len = Len }) -> %% We don't need to touch the persister here - the persister will %% already have these messages published and delivered as @@ -190,12 +194,18 @@ requeue(AckTags, State = #iv_state { pending_ack = PA, queue = Q, %% order to the last known state of our queue, prior to shutdown. {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> - {ok, Msg = #basic_message {}} = dict:find(Guid, PA), - {queue:in({Msg, true}, QN), LenN + 1} + {ok, {Msg = #basic_message {}, MsgProps}} + = dict:find(Guid, PA), + MsgProps1 = MsgPropsFun(MsgProps), + {enqueue(Msg, MsgProps1, true, QN), LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. +enqueue(Msg, MsgProps, IsDelivered, Q) -> + I = {Msg, MsgProps, IsDelivered}, + queue:in(I, Q). + len(#iv_state { len = Len }) -> Len. is_empty(State) -> 0 == len(State). @@ -216,6 +226,9 @@ status(_State) -> []. remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). +store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) -> + dict:store(Guid, {Msg, MsgProps}, PA). + %%---------------------------------------------------------------------------- lookup_tx(Txn) -> -- cgit v1.2.1 From 7794fbfccc040408c522aa8123fe8b587556c733 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 14:29:15 +0100 Subject: cosmetic fixes - tabs to spaces --- src/rabbit_amqqueue.erl | 2 +- src/rabbit_amqqueue_process.erl | 18 +++++++++--------- src/rabbit_invariable_queue.erl | 12 ++++++------ src/rabbit_queue_index.erl | 10 +++++----- src/rabbit_tests.erl | 12 ++++++------ src/rabbit_variable_queue.erl | 26 +++++++++++++------------- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ec25a871..60e37094 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -311,7 +311,7 @@ check_declare_arguments(QueueName, Args) -> "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, - {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(undefined) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 429f6644..6c420ed8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -446,16 +446,16 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} - end + case msg_expired(MsgProperties) of + true -> + fetch(AckRequired, State#q{backing_queue_state = BQS1}); + false -> + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + end end. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -581,7 +581,7 @@ msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> reset_msg_expiry_fun(State) -> fun(MsgProps) -> - MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} end. new_msg_properties(State) -> @@ -591,7 +591,7 @@ calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), - Now + (Ttl * 1000). + Now + (Ttl * 1000). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 3eea7bec..14afd767 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -111,7 +111,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, + MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> ok = persist_message(QName, IsDurable, none, Msg), @@ -165,7 +165,7 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> {lists:flatten(AckTags), State}. tx_commit(Txn, Fun, MsgPropsFun, - State = #iv_state { qname = QName, pending_ack = PA, + State = #iv_state { qname = QName, pending_ack = PA, queue = Q, len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, @@ -175,8 +175,8 @@ tx_commit(Txn, Fun, MsgPropsFun, AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - MsgProps1 = MsgPropsFun(MsgProps), - QN = enqueue(Msg, MsgProps1, false, Q), + MsgProps1 = MsgPropsFun(MsgProps), + QN = enqueue(Msg, MsgProps1, false, Q), {QN, LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. @@ -195,8 +195,8 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> {ok, {Msg = #basic_message {}, MsgProps}} - = dict:find(Guid, PA), - MsgProps1 = MsgPropsFun(MsgProps), + = dict:find(Guid, PA), + MsgProps1 = MsgPropsFun(MsgProps), {enqueue(Msg, MsgProps1, true, QN), LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index b5e92dca..ed04c1e1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -204,14 +204,14 @@ -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). -spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), - boolean(), qistate()) -> qistate()). + boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> {[{rabbit_guid:guid(), seq_id(), msg_properties(), - boolean(), boolean()}], qistate()}). + boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -259,8 +259,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX end):?JPREFIX_BITS, - SeqId:?SEQ_BITS>>, - create_pub_record_body(Guid, MsgProperties)]), + SeqId:?SEQ_BITS>>, + create_pub_record_body(Guid, MsgProperties)]), maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> @@ -540,7 +540,7 @@ read_pub_record_body(Hdl) -> X -> X end, {Guid, #msg_properties{expiry = Exp}}. - + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fdead8f9..a5059f87 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1661,11 +1661,11 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> - Guid = rabbit_guid:guid(), - Props = #msg_properties{expiry=12345}, - Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), - Qi2 + Guid = rabbit_guid:guid(), + Props = #msg_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + Qi2 end), ok = rabbit_variable_queue:stop(), @@ -1814,7 +1814,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - _Props, IsDelivered, AckTagN, Rem}, VQM} = + _Props, IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ddcf958f..5ffd6b61 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -498,7 +498,7 @@ publish(Msg, MsgProperties, State) -> publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, + MsgProps, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -636,9 +636,9 @@ requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, fun (#msg_status { msg = Msg, - msg_properties = MsgProperties }, State1) -> + msg_properties = MsgProperties }, State1) -> {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, @@ -646,7 +646,7 @@ requeue(AckTags, MsgPropsFun, State) -> read_from_msg_store(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProperties), - true, true, State2), + true, true, State2), State3 end, AckTags, State))). @@ -798,7 +798,7 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgPropert #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, - msg_properties = MsgProperties }. + msg_properties = MsgProperties }. find_msg_store(true) -> ?PERSISTENT_MSG_STORE; find_msg_store(false) -> ?TRANSIENT_MSG_STORE. @@ -1010,7 +1010,7 @@ remove_queue_entries1( %%---------------------------------------------------------------------------- publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProperties, IsDelivered, MsgOnDisk, + MsgProperties, IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, @@ -1021,7 +1021,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) #msg_status { is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk}, + msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1060,18 +1060,18 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_properties = MsgProperties}, - IndexState) + msg_properties = MsgProperties}, + IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, - MsgProperties, - IsPersistent, + SeqId, + MsgProperties, + IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, maybe_write_delivered(IsDelivered, SeqId, IndexState1)}; -- cgit v1.2.1 From 58c6368261c3e3e08a80d4f242c1a67ab0797c64 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 15:15:26 +0100 Subject: cosmetic changes - line width --- src/rabbit_amqqueue_process.erl | 16 +++++++++++----- src/rabbit_invariable_queue.erl | 4 ++-- src/rabbit_queue_index.erl | 15 +++++++++++---- src/rabbit_tests.erl | 6 ++++-- src/rabbit_variable_queue.erl | 28 +++++++++++++++++++--------- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6c420ed8..51ea4825 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -427,7 +427,8 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), MsgProperties = new_msg_properties(State), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. + {true, State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -436,7 +437,9 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, NewState} -> %% Txn is none and no unblocked channels with consumers MsgProperties = new_msg_properties(State), - BQS = BQ:publish(Message, MsgProperties, State #q.backing_queue_state), + BQS = BQ:publish(Message, + MsgProperties, + State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -454,7 +457,8 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, true -> fetch(AckRequired, State#q{backing_queue_state = BQS1}); false -> - {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} + {{Message, IsDelivered, AckTag, Remaining}, + State#q{backing_queue_state = BQS1}} end end. @@ -553,8 +557,10 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, reset_msg_expiry_fun(State), BQS), + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(State), + BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 14afd767..28471361 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -89,8 +89,8 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, - Acc) -> + fun ({#basic_message { is_persistent = false }, + _MsgProps, _IsDelivered}, Acc) -> Acc; ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, {AckTagsN, PAN}) -> diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index ed04c1e1..1af8dd76 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -261,7 +261,8 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, create_pub_record_body(Guid, MsgProperties)]), - maybe_flush_journal(add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). + maybe_flush_journal( + add_to_journal(SeqId, {Guid, MsgProperties, IsPersistent}, State1)). deliver(SeqIds, State) -> deliver_or_ack(del, SeqIds, State). @@ -457,7 +458,9 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> + fun (RelSeq, + {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, + Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) end, @@ -510,7 +513,9 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( - fun (_RelSeq, {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> + fun (_RelSeq, + {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> Acc @@ -789,7 +794,9 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) + fun (RelSeq, + {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, + Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), MsgProperties, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a5059f87..08ae0d6c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1664,7 +1664,8 @@ test_queue_index_props() -> Guid = rabbit_guid:guid(), Props = #msg_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), + {[{Guid, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -1877,7 +1878,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = + rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 5ffd6b61..e0cae48d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -638,7 +638,8 @@ requeue(AckTags, MsgPropsFun, State) -> fun (#msg_status { msg = Msg, msg_properties = MsgProperties }, State1) -> {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), + publish(Msg, MsgPropsFun(MsgProperties), true, + false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, @@ -794,7 +795,8 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, + MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false, @@ -834,7 +836,8 @@ erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> [Guid || - {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs]. + {#basic_message { guid = Guid, is_persistent = true }, + _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = @@ -927,8 +930,9 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, case IsDurable of true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of - #msg_status {} -> false; - {IsPersistent, _Guid, _MsgProperties} -> IsPersistent + #msg_status {} -> false; + {IsPersistent, + _Guid, _MsgProps} -> IsPersistent end]; false -> [] end, @@ -960,10 +964,12 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, + MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), + {SeqId, State3} = + publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), @@ -1094,7 +1100,9 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, msg_properties = MsgProperties } = MsgStatus, PA) -> + msg_on_disk = MsgOnDisk, + msg_properties = MsgProperties } = MsgStatus, + PA) -> AckEntry = case MsgOnDisk of true -> {IsPersistent, Guid, MsgProperties}; false -> MsgStatus @@ -1149,7 +1157,9 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> +accumulate_ack(SeqId, + {IsPersistent, Guid, _MsgProperties}, + {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. -- cgit v1.2.1 From 227a399a630962c86b9e88508cb6e5e8693a5afd Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 15:23:03 +0100 Subject: refactoring argument checking and line width changes --- src/rabbit_amqqueue.erl | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 60e37094..e4fb2587 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -56,8 +56,7 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(EXPIRES_TYPES, [byte, short, signedint, long]). --define(TTL_TYPES, [byte, short, signedint, long]). +-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]). %%---------------------------------------------------------------------------- @@ -310,29 +309,30 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, - {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], + end || {Key, Fun} <- + [{<<"x-expires">>, fun check_expires_argument/1}, + {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. -check_expires_argument(undefined) -> - ok; -check_expires_argument({Type, Expires}) when Expires > 0 -> - case lists:member(Type, ?EXPIRES_TYPES) of - true -> ok; - false -> {error, {expires_not_of_acceptable_type, Type, Expires}} - end; -check_expires_argument({_Type, _Expires}) -> - {error, expires_zero_or_less}. +check_expires_argument(Val) -> + check_integer_argument(Val, + expires_not_of_acceptable_type, + expires_zero_or_less). + +check_message_ttl_argument(Val) -> + check_integer_argument(Val, + ttl_not_of_acceptable_type, + ttl_zero_or_less). -check_message_ttl_argument(undefined) -> +check_integer_argument(undefined, _, _) -> ok; -check_message_ttl_argument({Type, TTL}) when TTL > 0 -> - case lists:member(Type, ?TTL_TYPES) of +check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> + case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; - false -> {error, {ttl_not_of_acceptable_type, Type, TTL}} + false -> {error, {InvalidTypeError, Type, Value}} end; -check_message_ttl_argument({_Type, _TTL}) -> - {error, ttl_zero_or_less}. +check_message_ttl_argument({_Type, _Value}, _, ZeroOrLessError) -> + {error, ZeroOrLessError}. list(VHostPath) -> mnesia:dirty_match_object( -- cgit v1.2.1 From bf5dd56ae87fc51c95872efd4cd04ff999600661 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 15:43:16 +0100 Subject: fixed naming error --- src/rabbit_amqqueue.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e4fb2587..309fd717 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -329,9 +329,9 @@ check_integer_argument(undefined, _, _) -> check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; - false -> {error, {InvalidTypeError, Type, Value}} + false -> {error, {InvalidTypeError, Type, Val}} end; -check_message_ttl_argument({_Type, _Value}, _, ZeroOrLessError) -> +check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> {error, ZeroOrLessError}. list(VHostPath) -> -- cgit v1.2.1 From e0bad5f68e842fe18ae68d10c37f9358f5001371 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 16:55:23 +0100 Subject: fixed issue in invariable queue preventing per-queue ttl working for transient messages --- src/rabbit_amqqueue_process.erl | 22 ++++++++++++---------- src/rabbit_invariable_queue.erl | 10 +++++----- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 51ea4825..1df0c054 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -413,32 +413,32 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, MsgProps, + State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, MsgProps, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, MsgProps, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - MsgProperties = new_msg_properties(State), {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, MsgProperties, BQS)}}. + BQ:tx_publish(Txn, Message, MsgProps, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of + MsgProps = new_msg_properties(State), + case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - MsgProperties = new_msg_properties(State), BQS = BQ:publish(Message, - MsgProperties, + MsgProps, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -718,7 +718,9 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, + new_msg_properties(State), + State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 28471361..2993b325 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -176,8 +176,8 @@ tx_commit(Txn, Fun, MsgPropsFun, PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> MsgProps1 = MsgPropsFun(MsgProps), - QN = enqueue(Msg, MsgProps1, false, Q), - {QN, LenN + 1} + QM = enqueue(Msg, MsgProps1, false, QN), + {QM, LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. @@ -203,8 +203,7 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. enqueue(Msg, MsgProps, IsDelivered, Q) -> - I = {Msg, MsgProps, IsDelivered}, - queue:in(I, Q). + queue:in({Msg, MsgProps, IsDelivered}, Q). len(#iv_state { len = Len }) -> Len. @@ -280,7 +279,8 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, Msg} = dict:find(Guid, PA), + {ok, {Msg, _MsgProps}} + = dict:find(Guid, PA), Msg #basic_message.is_persistent end]); persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> -- cgit v1.2.1 From a30cd8fc6498d7ce6943c0699cc92104fee31fe6 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 17:17:33 +0100 Subject: cosmetic changes, alignment etc --- src/rabbit_amqqueue_process.erl | 18 ++++++++++-------- src/rabbit_invariable_queue.erl | 42 ++++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1df0c054..f8953852 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -160,8 +160,8 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of - {_Type, Ttl} -> State#q{ttl=Ttl}; - undefined -> State + {_Type, Ttl} -> State#q{ttl = Ttl}; + undefined -> State end. declare(Recover, From, @@ -444,9 +444,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> - MsgPropsFun = reset_msg_expiry_fun(State), maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, MsgPropsFun, BQS) end, State). + fun (BQS) -> + BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -454,7 +455,7 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> case msg_expired(MsgProperties) of - true -> + true -> fetch(AckRequired, State#q{backing_queue_state = BQS1}); false -> {{Message, IsDelivered, AckTag, Remaining}, @@ -748,11 +749,12 @@ handle_call({basic_get, ChPid, NoAck}, _From, AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, State1) of - {empty, State2} -> reply(empty, State2); + {empty, State2} -> + reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 2993b325..d317b55c 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -100,10 +100,10 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, + durable = IsDurable, + len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg), Q1 = enqueue(Msg, MsgProps, false, Q), State #iv_state { queue = Q1, len = Len + 1 }. @@ -120,8 +120,10 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, queue = Q, qname = QName, - durable = IsDurable, +fetch(AckRequired, State = #iv_state { len = Len, + queue = Q, + qname = QName, + durable = IsDurable, pending_ack = PA }) -> {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, Q1} = queue:out(Q), @@ -164,9 +166,10 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, MsgPropsFun, - State = #iv_state { qname = QName, pending_ack = PA, - queue = Q, len = Len }) -> +tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, + pending_ack = PA, + queue = Q, + len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, Txn, QName), @@ -175,14 +178,15 @@ tx_commit(Txn, Fun, MsgPropsFun, AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - MsgProps1 = MsgPropsFun(MsgProps), - QM = enqueue(Msg, MsgProps1, false, QN), - {QM, LenN + 1} + {enqueue(Msg, MsgPropsFun(MsgProps), + false, QN), + LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, - len = Len }) -> +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, + queue = Q, + len = Len }) -> %% We don't need to touch the persister here - the persister will %% already have these messages published and delivered as %% necessary. The complication is that the persister's seq_id will @@ -194,10 +198,10 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, %% order to the last known state of our queue, prior to shutdown. {Q1, Len1} = lists:foldl( fun (Guid, {QN, LenN}) -> - {ok, {Msg = #basic_message {}, MsgProps}} - = dict:find(Guid, PA), - MsgProps1 = MsgPropsFun(MsgProps), - {enqueue(Msg, MsgProps1, true, QN), LenN + 1} + {Msg = #basic_message {}, MsgProps} + = dict:fetch(Guid, PA), + {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), + LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. @@ -225,7 +229,7 @@ status(_State) -> []. remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). -store_ack(Msg = #basic_message { guid = Guid}, MsgProps, PA) -> +store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> dict:store(Guid, {Msg, MsgProps}, PA). %%---------------------------------------------------------------------------- -- cgit v1.2.1 From 6696a8a02c1fa2224b439c99432308c32eb5676f Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 17:37:43 +0100 Subject: stylistic issues, reworking handling of msg_properties --- src/rabbit_amqqueue_process.erl | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f8953852..1aa1d05f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -413,32 +413,31 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, MsgProps, - State = #q{backing_queue = BQ}) -> +attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, MsgProps, BQS), + BQ:publish_delivered(AckRequired, Message, + #msg_properties{}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, MsgProps, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, MsgProps, BQS)}}. + BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - MsgProps = new_msg_properties(State), - case attempt_delivery(Txn, ChPid, Message, MsgProps, State) of + case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers BQS = BQ:publish(Message, - MsgProps, + msg_properties(State), State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -591,7 +590,7 @@ reset_msg_expiry_fun(State) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} end. -new_msg_properties(State) -> +msg_properties(State) -> #msg_properties{expiry = calculate_msg_expiry(State)}. calculate_msg_expiry(_State = #q{ttl = undefined}) -> @@ -719,9 +718,10 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, - new_msg_properties(State), - State), + + %% we don't need an expiry here because messages are not being + %% enqueued, so we use an empty msg_properties. + {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); handle_call({deliver, Txn, Message, ChPid}, _From, State) -> -- cgit v1.2.1 From ba20346219342650668fae0a72d1bb1449a6e9be Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 18:01:41 +0100 Subject: cosmetic changes --- src/rabbit_queue_index.erl | 7 ++++--- src/rabbit_variable_queue.erl | 23 +++++++++++------------ 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1af8dd76..6ace4c8b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -538,11 +538,14 @@ expiry_to_binary(Expiry) -> read_pub_record_body(Hdl) -> {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), + + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 <> = Bin, <> = <>, Exp = case Expiry of ?NO_EXPIRY -> undefined; - X -> X + X -> X end, {Guid, #msg_properties{expiry = Exp}}. @@ -829,8 +832,6 @@ load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <>} -> - %% because we specify /binary, and binaries are complete - %% bytes, the size spec is in bytes, not bits. {Guid, MsgProperties} = read_pub_record_body(Hdl), Obj = {{Guid, MsgProperties, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries1 = array:set(RelSeq, Obj, SegEntries), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e0cae48d..6dc24e80 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -613,11 +613,10 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - F = fun({Msg, MsgProperties}) -> - {Msg, MsgPropsFun(MsgProperties)} - end, - PubsProcessed = lists:map(F, Pubs), - PubsOrdered = lists:reverse(PubsProcessed), + PubsOrdered = lists:foldl( + fun ({Msg, MsgProps}, Acc) -> + [{Msg, MsgPropsFun(MsgProps)} | Acc] + end, [], Pubs), AckTags1 = lists:append(AckTags), PersistentGuids = persistent_guids(PubsOrdered), HasPersistentPubs = PersistentGuids =/= [], @@ -848,13 +847,13 @@ betas_from_index_entries(List, TransientThreshold, IndexState) -> true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {[m(#msg_status { msg = undefined, - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, + false -> {[m(#msg_status { msg = undefined, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, msg_properties = MsgProperties }) | Filtered1], Delivers1, -- cgit v1.2.1 From b37fa7de3c7287613bb65f5362c7343ea8482084 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 21 Sep 2010 22:38:50 +0100 Subject: persistence of message props working for invariable queue --- src/rabbit_invariable_queue.erl | 13 +++++++------ src/rabbit_persister.erl | 21 +++++++++++---------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index d317b55c..152d2a87 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -104,7 +104,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, qname = QName, durable = IsDurable, len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), Q1 = enqueue(Msg, MsgProps, false, Q), State #iv_state { queue = Q1, len = Len + 1 }. @@ -114,7 +114,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, MsgProps, State = #iv_state { qname = QName, durable = IsDurable, len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg), + ok = persist_message(QName, IsDurable, none, Msg, MsgProps), ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. @@ -149,7 +149,7 @@ tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg), + ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), State. tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, @@ -263,14 +263,15 @@ do_if_persistent(F, Txn, QName) -> %%---------------------------------------------------------------------------- persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }) -> + is_persistent = true }, MsgProps) -> Msg1 = Msg #basic_message { %% don't persist any recoverable decoded properties content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg) -> + [{publish, Msg1, MsgProps, + {QName, Msg1 #basic_message.guid}}]); +persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. persist_delivery(QName, true, false, #basic_message { is_persistent = true, diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 66e5cf63..dad81873 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -173,9 +173,9 @@ handle_call(force_snapshot, _From, State) -> handle_call({queue_content, QName}, _From, State = #pstate{snapshot = #psnapshot{messages = Messages, queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3'}, [], [{{'$3', '$1', '$2'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), D} || - {_, K, D} <- lists:sort(ets:select(Queues, MatchSpec))], + MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], [{{'$4', '$1', '$2', '$3'}}]}], + do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || + {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], State); handle_call(_Request, _From, State) -> {noreply, State}. @@ -243,9 +243,9 @@ log_work(CreateWorkUnit, MessageList, snapshot = Snapshot = #psnapshot{messages = Messages}}) -> Unit = CreateWorkUnit( rabbit_misc:map_in_order( - fun (M = {publish, Message, QK = {_QName, PKey}}) -> + fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> case ets:lookup(Messages, PKey) of - [_] -> {tied, QK}; + [_] -> {tied, MsgProps, QK}; [] -> ets:insert(Messages, {PKey, Message}), M end; @@ -356,7 +356,8 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, next_seq_id = NextSeqId}) -> %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, _SeqId}, S) -> + PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, + _MsgProps, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues), prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), @@ -474,14 +475,14 @@ perform_work(MessageList, Messages, Queues, SeqId) -> perform_work_item(Item, Messages, Queues, NextSeqId) end, SeqId, MessageList). -perform_work_item({publish, Message, QK = {_QName, PKey}}, +perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, Messages, Queues, NextSeqId) -> true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, NextSeqId}), + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; -perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, NextSeqId}), +perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> + true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), NextSeqId + 1; perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> -- cgit v1.2.1 From c3cae270c5843d1d83e71fad0110e04a495abec7 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 22 Sep 2010 12:06:23 +0100 Subject: clean up of specs and dialzer results --- include/rabbit_backing_queue_spec.hrl | 28 ++++++++++++++++++++-------- src/rabbit_queue_index.erl | 30 +++++++++++++++++------------- src/rabbit_types.erl | 5 ++++- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 005994f0..f417c6d9 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -30,12 +30,17 @@ %% -type(fetch_result() :: - %% Message, IsDelivered, AckTag, Remaining_Len - ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). + %% Message, MessageProperties, IsDelivered, AckTag, Remaining_Len + ('empty'|{rabbit_types:basic_message(), + rabbit_types:msg_properties(), + boolean(), ack(), + non_neg_integer()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(msg_properties_transformer() :: + fun ((rabbit_types:msg_properties()) -> rabbit_types:msg_properties())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -43,16 +48,23 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()). --spec(publish_delivered/3 :: - (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}). +-spec(publish/3 :: + (rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) + -> state()). +-spec(publish_delivered/4 :: + (ack_required(), rabbit_types:basic_message(), + rabbit_types:msg_properties(), state()) -> {ack(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()). +-spec(tx_publish/4 :: + (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:msg_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). --spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}). --spec(requeue/2 :: ([ack()], state()) -> state()). +-spec(tx_commit/4 :: + (rabbit_types:txn(), fun (() -> any()), + msg_properties_transformer(), state()) -> {[ack()], state()}). +-spec(requeue/3 :: ([ack()], msg_properties_transformer(), state()) -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6ace4c8b..820378a5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -203,14 +203,15 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_guid:guid(), seq_id(), msg_properties(), +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(), boolean(), qistate()) -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), msg_properties(), + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:msg_properties(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> @@ -537,17 +538,20 @@ expiry_to_binary(Expiry) -> <>. read_pub_record_body(Hdl) -> - {ok, Bin} = file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES), - - %% work around for binary data fragmentation. See - %% rabbit_msg_file:read_next/2 - <> = Bin, - <> = <>, - Exp = case Expiry of - ?NO_EXPIRY -> undefined; - X -> X - end, - {Guid, #msg_properties{expiry = Exp}}. + case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of + {ok, Bin} -> + %% work around for binary data fragmentation. See + %% rabbit_msg_file:read_next/2 + <> = Bin, + <> = <>, + Exp = case Expiry of + ?NO_EXPIRY -> undefined; + X -> X + end, + {Guid, #msg_properties{expiry = Exp}}; + Error -> + Error + end. %%---------------------------------------------------------------------------- %% journal manipulation diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 0b6a15ec..35f74da3 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -41,7 +41,8 @@ amqp_error/0, r/1, r2/2, r3/3, listener/0, binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, - ok_pid_or_error/0, channel_exit/0, connection_exit/0]). + ok_pid_or_error/0, channel_exit/0, connection_exit/0, + msg_properties/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -86,6 +87,8 @@ txn :: maybe(txn()), sender :: pid(), message :: message()}). +-type(msg_properties() :: + #msg_properties{expiry :: pos_integer()}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). -- cgit v1.2.1 From 85ce982ea685c47f6546957466f826549452dff0 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 22 Sep 2010 12:51:59 +0100 Subject: fixed problem in vq:tx_commit, moved expiry recalculation to post_msg_store callback --- src/rabbit_variable_queue.erl | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6dc24e80..202f2c99 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -613,22 +613,18 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:foldl( - fun ({Msg, MsgProps}, Acc) -> - [{Msg, MsgPropsFun(MsgProps)} | Acc] - end, [], Pubs), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(PubsOrdered), + PersistentGuids = persistent_guids(Pubs), HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, - PubsOrdered, AckTags1, Fun)), + msg_store_callback(PersistentGuids,Pubs, AckTags1, + Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store( - HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + Fun, MsgPropsFun, State) end)}. requeue(AckTags, MsgPropsFun, State) -> @@ -901,11 +897,12 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, Fun, StateN) + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -916,7 +913,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> end) end. -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { acks_persistent = SPAcks, @@ -935,17 +932,21 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, end]; false -> [] end, + PubsOrdered = lists:foldl( + fun ({Msg, MsgProps}, Acc) -> + [{Msg, MsgPropsFun(MsgProps)} | Acc] + end, [], Pubs), case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of true -> State #vqstate { on_sync = #sync { acks_persistent = [PersistentAcks | SPAcks], acks_all = [AckTags | SAcks], - pubs = [Pubs | SPubs], + pubs = [PubsOrdered | SPubs], funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( State #vqstate { on_sync = #sync { acks_persistent = [], acks_all = [AckTags], - pubs = [Pubs], + pubs = [PubsOrdered], funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. -- cgit v1.2.1 From f8174a443dd985bbf0dac128b21c7d5d84a66499 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 22 Sep 2010 18:37:40 +0100 Subject: reworked how message filtering works with dropwhile --- include/rabbit_backing_queue_spec.hrl | 6 +- src/rabbit_amqqueue_process.erl | 36 +++++----- src/rabbit_backing_queue.erl | 4 ++ src/rabbit_invariable_queue.erl | 7 +- src/rabbit_persister.erl | 2 +- src/rabbit_tests.erl | 8 +-- src/rabbit_variable_queue.erl | 132 ++++++++++++++++++++-------------- 7 files changed, 116 insertions(+), 79 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index f417c6d9..3e78d571 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -30,9 +30,8 @@ %% -type(fetch_result() :: - %% Message, MessageProperties, IsDelivered, AckTag, Remaining_Len + %% Message, IsDelivered, AckTag, Remaining_Len ('empty'|{rabbit_types:basic_message(), - rabbit_types:msg_properties(), boolean(), ack(), non_neg_integer()})). -type(is_durable() :: boolean()). @@ -54,6 +53,9 @@ -spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) -> {ack(), state()}). +-spec(dropwhile/2 :: + (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) + -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1aa1d05f..d92dd586 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -406,9 +406,10 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, + #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. @@ -451,17 +452,24 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; - {{Message, MsgProperties, IsDelivered, AckTag, Remaining}, BQS1} -> - case msg_expired(MsgProperties) of - true -> - fetch(AckRequired, State#q{backing_queue_state = BQS1}); - false -> - {{Message, IsDelivered, AckTag, Remaining}, - State#q{backing_queue_state = BQS1}} - end + {empty, BQS1} -> + {empty, State#q{backing_queue_state = BQS1}}; + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + {{Message, IsDelivered, AckTag, Remaining}, + State#q{backing_queue_state = BQS1}} end. +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + BQS1 = BQ:dropwhile( + fun (_Msg, _MsgProperties = #msg_properties{expiry = undefined}) -> + false; + (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + Now = timer:now_diff(os:timestamp(), {0,0,0}), + Now > Expiry + end, BQS), + State #q{backing_queue_state = BQS1}. + add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -579,12 +587,6 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -msg_expired(_MsgProperties = #msg_properties{expiry = undefined}) -> - false; -msg_expired(_MsgProperties = #msg_properties{expiry=Expiry}) -> - Now = timer:now_diff(now(), {0,0,0}), - Now > Expiry. - reset_msg_expiry_fun(State) -> fun(MsgProps) -> MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} @@ -748,7 +750,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, State1) of + case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 5cb78368..4f71c1a8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -69,6 +69,10 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 4}, + %% Drop messages in the queue while the supplied predicate + %% returns true and return the new state. + {dropwhile, 2}, + %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 152d2a87..4626b513 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,8 +32,8 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, - tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,6 +118,9 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. +dropwhile(Pred, State) -> + State. + fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; fetch(AckRequired, State = #iv_state { len = Len, diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index dad81873..6c501fc0 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,7 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), pmsg()} | + {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 08ae0d6c..ee2b564d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1815,7 +1815,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - _Props, IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(true, VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1878,7 +1878,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, _MsgProps, false, AckTag, Len}, VQ2} = + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). @@ -1943,7 +1943,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), - {{_Msg1, _Props, true, _AckTag1, Count1}, VQ8} = + {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), @@ -1989,7 +1989,7 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = rabbit_variable_queue:init(QName, true, true), - {{_Msg1, _Props, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), rabbit_amqqueue:internal_delete(QName) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 202f2c99..4df4088c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -518,64 +518,90 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount1, pending_ack = PA1 })}. -fetch(AckRequired, State = #vqstate { q4 = Q4, - ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + +dropwhile(Pred, State) -> + case internal_queue_out( + fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, + Q4a, State1) -> + case Pred(Msg, MsgProps) of + true -> + {_, State2} = internal_fetch(false, Q4a, + MsgStatus, State1), + dropwhile(Pred, State2); + false -> + State1 + end + end, State) of + {empty, State2} -> State2; + State2 -> State2 + end. + +fetch(AckRequired, State) -> + internal_queue_out( + fun(MsgStatus, Q4a, State1) -> + internal_fetch(AckRequired, Q4a, MsgStatus, State1) + end, State). + +internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> fetch(AckRequired, State1) + {empty, State1} = Result -> a(State1), Result; + {loaded, State1} -> internal_queue_out(Fun, State1) end; - {{value, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, - msg_properties = MsgProperties }}, - Q4a} -> - - %% 1. Mark it delivered if necessary - IndexState1 = maybe_write_delivered( - IndexOnDisk andalso not IsDelivered, - SeqId, IndexState), - - %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 - end, - - %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} - end, - - PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, - {{Msg, MsgProperties, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })} + {{value, Value}, Q4a} -> + %% don't automatically overwrite the state with the popped + %% queue because some callbacks choose to rollback the pop + %% of the message from the queue + Fun(Value, Q4a, State) end. +internal_fetch(AckRequired, Q4a, + MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, + pending_ack = PA }) -> + %% 1. Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + %% 2. Remove from msg_store and queue index, if necessary + MsgStore = find_msg_store(IsPersistent), + Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, + IndexState2 = + case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of + {false, true, false, _} -> Rem(), IndexState1; + {false, true, true, _} -> Rem(), Ack(); + { true, true, true, false} -> Ack(); + _ -> IndexState1 + end, + + %% 3. If an ack is required, add something sensible to PA + {AckTag, PA1} = case AckRequired of + true -> PA2 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, PA), + {SeqId, PA2}; + false -> {blank_ack, PA} + end, + + PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), + Len1 = Len - 1, + {{Msg, IsDelivered, AckTag, Len1}, + a(State #vqstate { q4 = Q4a, + ram_msg_count = RamMsgCount - 1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 })}. + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, -- cgit v1.2.1 From 276a2b22a9d573c2a097b770ed9c06dc28bae061 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 23 Sep 2010 13:02:58 +0100 Subject: invariable queue has dropwhile implementation now --- src/rabbit_invariable_queue.erl | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 4626b513..8eb6ebbd 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -118,18 +118,31 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. -dropwhile(Pred, State) -> - State. +dropwhile(_Pred, State = #iv_state { len = 0 }) -> + State; +dropwhile(Pred, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + case Pred(Msg, MsgProps) of + true -> + {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), + dropwhile(Pred, State1); + false -> + State + end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> {empty, State}; -fetch(AckRequired, State = #iv_state { len = Len, - queue = Q, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - {{value, {Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}}, - Q1} = queue:out(Q), +fetch(AckRequired, State = #iv_state { queue = Q }) -> + {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), + fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). + +fetch_internal(AckRequired, Q1, + Msg = #basic_message {guid = Guid}, + MsgProps, IsDelivered, + State = #iv_state { len = Len, + qname = QName, + durable = IsDurable, + pending_ack = PA }) -> Len1 = Len - 1, ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), PA1 = store_ack(Msg, MsgProps, PA), @@ -139,7 +152,7 @@ fetch(AckRequired, State = #iv_state { len = Len, [Guid], PA1), {blank_ack, PA} end, - {{Msg, MsgProps, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag, Len1}, State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, -- cgit v1.2.1 From a375c5f0692aaaf0bd93f4e1d7a54e98179a5f93 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Thu, 23 Sep 2010 14:04:14 +0100 Subject: cosmetic, tab clean up --- src/rabbit_invariable_queue.erl | 18 +++++++++--------- src/rabbit_persister.erl | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 8eb6ebbd..3b449efa 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -123,11 +123,11 @@ dropwhile(_Pred, State = #iv_state { len = 0 }) -> dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), case Pred(Msg, MsgProps) of - true -> - {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), - dropwhile(Pred, State1); - false -> - State + true -> + {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), + dropwhile(Pred, State1); + false -> + State end. fetch(_AckRequired, State = #iv_state { len = 0 }) -> @@ -137,9 +137,9 @@ fetch(AckRequired, State = #iv_state { queue = Q }) -> fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). fetch_internal(AckRequired, Q1, - Msg = #basic_message {guid = Guid}, - MsgProps, IsDelivered, - State = #iv_state { len = Len, + Msg = #basic_message {guid = Guid}, + MsgProps, IsDelivered, + State = #iv_state { len = Len, qname = QName, durable = IsDurable, pending_ack = PA }) -> @@ -286,7 +286,7 @@ persist_message(QName, true, Txn, Msg = #basic_message { Msg #basic_message.content)}, persist_work(Txn, QName, [{publish, Msg1, MsgProps, - {QName, Msg1 #basic_message.guid}}]); + {QName, Msg1 #basic_message.guid}}]); persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 6c501fc0..e50d3323 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -357,7 +357,7 @@ current_snapshot(_Snapshot = #psnapshot{transactions = Ts, %% Avoid infinite growth of the table by removing messages not %% bound to a queue anymore PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, - _MsgProps, _SeqId}, S) -> + _MsgProps, _SeqId}, S) -> sets:add_element(PKey, S) end, sets:new(), Queues), prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), -- cgit v1.2.1 From 2fa9b2c4bee835c7f3105d2b2685486fb143fd4e Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Mon, 27 Sep 2010 14:40:26 +0100 Subject: slight improvement to performance when discarding many messages --- src/rabbit_amqqueue_process.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d92dd586..b6578b7e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -459,13 +459,13 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, State#q{backing_queue_state = BQS1}} end. +drop_expired_messages(State = #q{ttl = undefined}) -> + State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> + Now = timer:now_diff(now(), {0,0,0}), BQS1 = BQ:dropwhile( - fun (_Msg, _MsgProperties = #msg_properties{expiry = undefined}) -> - false; - (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> - Now = timer:now_diff(os:timestamp(), {0,0,0}), + fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> Now > Expiry end, BQS), State #q{backing_queue_state = BQS1}. -- cgit v1.2.1 From 6b640144895f964e81bdd8d6a234fc3cfb17cf7c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Mon, 27 Sep 2010 14:46:48 +0100 Subject: minor stylistic improvement --- src/rabbit_amqqueue_process.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b6578b7e..08ce0ed6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -150,7 +150,8 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- init_queue_state(State) -> - init_expires(init_ttl(State)). + lists:foldr(fun(F, S) -> F(S) end, State, + [fun init_expires/1, fun init_ttl/1]). init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of -- cgit v1.2.1 From 3141b0e9b6edaebf69e02ceacdfec5e45494f23f Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Mon, 27 Sep 2010 16:21:38 +0100 Subject: added peek to backing queue, implemented in vq --- src/rabbit_amqqueue_process.erl | 2 +- src/rabbit_backing_queue.erl | 3 +++ src/rabbit_tests.erl | 58 ++++++++++++++++++++++++++++++++++++++++- src/rabbit_variable_queue.erl | 13 ++++++--- 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 08ce0ed6..955b607f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -150,7 +150,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- init_queue_state(State) -> - lists:foldr(fun(F, S) -> F(S) end, State, + lists:foldl(fun(F, S) -> F(S) end, State, [fun init_expires/1, fun init_ttl/1]). init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4f71c1a8..eaabc651 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -73,6 +73,9 @@ behaviour_info(callbacks) -> %% returns true and return the new state. {dropwhile, 2}, + %% Peek at the next message. + {peek, 1}, + %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ee2b564d..bc99af55 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1842,9 +1842,65 @@ test_variable_queue() -> F <- [fun test_variable_queue_dynamic_duration_change/1, fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_dropwhile/1, + fun test_peek/1]], passed. +test_dropwhile(VQ0) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #msg_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), + + %% drop the first 5 messages + VQ2 = rabbit_variable_queue:dropwhile( + fun(_Msg, #msg_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), + + %% fetch five now + VQ3 = lists:foldl(fun (_N, VQN) -> + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(1, 5)), + + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + + VQ4. + +test_peek(VQ0) -> + Expiry = 123, + Body = <<"test">>, + + %% publish message + VQ1 = rabbit_variable_queue:publish(rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, Body), + #msg_properties{ expiry = Expiry }, + VQ0), + + %% take a peek + {{#basic_message{ content = Content }, + #msg_properties { expiry = Expiry}}, VQ2} = + rabbit_variable_queue:peek(VQ1), + + {_, Body} = rabbit_basic:from_content(Content), + + %% should be able to fetch still + {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), + + VQ3. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 4df4088c..bf1af596 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + requeue/3, len/1, is_empty/1, dropwhile/2, peek/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -518,7 +518,14 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount1, pending_ack = PA1 })}. - +peek(State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, + _, State1) -> + {{Msg, MsgProps}, State1} + end, State). + + dropwhile(Pred, State) -> case internal_queue_out( fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, @@ -601,7 +608,7 @@ internal_fetch(AckRequired, Q4a, len = Len1, persistent_count = PCount1, pending_ack = PA1 })}. - + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, -- cgit v1.2.1 From 611039e1d4391843e11736cb30f5c313f3c34599 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Mon, 27 Sep 2010 17:04:29 +0100 Subject: added peek to ivq. some tests for peek --- include/rabbit_backing_queue_spec.hrl | 5 ++++- src/rabbit_invariable_queue.erl | 8 +++++++- src/rabbit_tests.erl | 5 ++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 3e78d571..6067ac62 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -34,6 +34,8 @@ ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). +-type(peek_result() :: ('empty'|{rabbit_types:basic_message(), + rabbit_types:msg_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). @@ -55,7 +57,8 @@ rabbit_types:msg_properties(), state()) -> {ack(), state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) - -> boolean()), state()) -> state()). + -> boolean()), state()) -> state()). +-spec(peek/1 :: (state()) -> {peek_result(), state()}). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 3b449efa..59c678c9 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1, dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,6 +118,12 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. +peek(State = #iv_state { len = 0 }) -> + {empty, State}; +peek(State = #iv_state { queue = Q}) -> + {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q), + {{Msg, MsgProps}, State}. + dropwhile(_Pred, State = #iv_state { len = 0 }) -> State; dropwhile(Pred, State = #iv_state { queue = Q }) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bc99af55..37b0916b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1899,7 +1899,10 @@ test_peek(VQ0) -> %% should be able to fetch still {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), - VQ3. + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:peek(VQ3), + + VQ4. test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), -- cgit v1.2.1 From d7d7d70cc8cd70a894e3802e4ffe5022ad236c1a Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 28 Sep 2010 12:20:27 +0100 Subject: cosmetic --- src/rabbit_invariable_queue.erl | 3 ++- src/rabbit_persister.erl | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 59c678c9..b62544fa 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -130,7 +130,8 @@ dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), case Pred(Msg, MsgProps) of true -> - {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), + {_, State1} = + fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), dropwhile(Pred, State1); false -> State diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index e50d3323..513b14df 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -173,7 +173,8 @@ handle_call(force_snapshot, _From, State) -> handle_call({queue_content, QName}, _From, State = #pstate{snapshot = #psnapshot{messages = Messages, queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], [{{'$4', '$1', '$2', '$3'}}]}], + MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], + [{{'$4', '$1', '$2', '$3'}}]}], do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], State); -- cgit v1.2.1 From fd3581c6165e7e6356789f295d4910a6fc0330d3 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 28 Sep 2010 13:31:37 +0100 Subject: added timer to collect expired messages --- src/rabbit_amqqueue_process.erl | 46 +++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 955b607f..52663f15 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -64,7 +64,8 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - ttl + ttl, + ttl_timer_ref }). -record(consumer, {tag, ack_required}). @@ -441,7 +442,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> BQS = BQ:publish(Message, msg_properties(State), State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -460,17 +461,6 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, State#q{backing_queue_state = BQS1}} end. -drop_expired_messages(State = #q{ttl = undefined}) -> - State; -drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> - Now = timer:now_diff(now(), {0,0,0}), - BQS1 = BQ:dropwhile( - fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> - Now > Expiry - end, BQS), - State #q{backing_queue_state = BQS1}. - add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). remove_consumer(ChPid, ConsumerTag, Queue) -> @@ -602,6 +592,33 @@ calculate_msg_expiry(_State = #q{ttl = Ttl}) -> Now = timer:now_diff(now(), {0,0,0}), Now + (Ttl * 1000). +drop_expired_messages(State = #q{ttl = undefined}) -> + State; +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + Now = timer:now_diff(now(), {0,0,0}), + BQS1 = BQ:dropwhile( + fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + Now > Expiry + end, BQS), + ensure_ttl_timer(State #q{backing_queue_state = BQS1}). + +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = Ttl, + ttl_timer_ref = undefined}) + when Ttl =/= undefined-> + case BQ:is_empty(BQS) of + true -> + State; + false -> + State#q{ttl_timer_ref = + timer:send_after(Ttl, self(), drop_expired)} + end; +ensure_ttl_timer(State) -> + State. + + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -993,6 +1010,9 @@ handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:idle_timeout(BQS) end, State)); +handle_info(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -- cgit v1.2.1 From b80dff2227f4159df59bbb9691b96dd2c30e2072 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Tue, 28 Sep 2010 16:45:58 +0100 Subject: removed peek, and restructured dropwhile to not load message content from disk --- include/rabbit_backing_queue_spec.hrl | 5 +- src/rabbit_amqqueue_process.erl | 5 +- src/rabbit_backing_queue.erl | 3 - src/rabbit_invariable_queue.erl | 10 +-- src/rabbit_tests.erl | 31 +--------- src/rabbit_variable_queue.erl | 112 ++++++++++++++++++---------------- 6 files changed, 69 insertions(+), 97 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 6067ac62..f750fbb2 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -56,9 +56,8 @@ (ack_required(), rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) -> {ack(), state()}). -spec(dropwhile/2 :: - (fun ((rabbit_types:basic_message(), rabbit_types:msg_properties()) - -> boolean()), state()) -> state()). --spec(peek/1 :: (state()) -> {peek_result(), state()}). + (fun ((rabbit_types:msg_properties()) -> boolean()), state()) + -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). -spec(tx_publish/4 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 52663f15..4b4153e0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -411,7 +411,8 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), + #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. @@ -598,7 +599,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = timer:now_diff(now(), {0,0,0}), BQS1 = BQ:dropwhile( - fun (_Msg, _MsgProperties = #msg_properties{expiry=Expiry}) -> + fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> Now > Expiry end, BQS), ensure_ttl_timer(State #q{backing_queue_state = BQS1}). diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index eaabc651..4f71c1a8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -73,9 +73,6 @@ behaviour_info(callbacks) -> %% returns true and return the new state. {dropwhile, 2}, - %% Peek at the next message. - {peek, 1}, - %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index b62544fa..feb7c7e1 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_invariable_queue). -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, peek/1, + publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -118,17 +118,11 @@ publish_delivered(true, Msg = #basic_message { guid = Guid }, ok = persist_delivery(QName, IsDurable, false, Msg), {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. -peek(State = #iv_state { len = 0 }) -> - {empty, State}; -peek(State = #iv_state { queue = Q}) -> - {value, {Msg, MsgProps, _IsDelivered}} = queue:peek(Q), - {{Msg, MsgProps}, State}. - dropwhile(_Pred, State = #iv_state { len = 0 }) -> State; dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - case Pred(Msg, MsgProps) of + case Pred(MsgProps) of true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 37b0916b..430a79d9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1843,8 +1843,7 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1, - fun test_peek/1]], + fun test_dropwhile/1]], passed. test_dropwhile(VQ0) -> @@ -1862,7 +1861,7 @@ test_dropwhile(VQ0) -> %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( - fun(_Msg, #msg_properties { expiry = Expiry }) -> + fun(#msg_properties { expiry = Expiry }) -> Expiry =< 5 end, VQ1), @@ -1878,32 +1877,6 @@ test_dropwhile(VQ0) -> VQ4. -test_peek(VQ0) -> - Expiry = 123, - Body = <<"test">>, - - %% publish message - VQ1 = rabbit_variable_queue:publish(rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, Body), - #msg_properties{ expiry = Expiry }, - VQ0), - - %% take a peek - {{#basic_message{ content = Content }, - #msg_properties { expiry = Expiry}}, VQ2} = - rabbit_variable_queue:peek(VQ1), - - {_, Body} = rabbit_basic:from_content(Content), - - %% should be able to fetch still - {{_Msg, _, _, _}, VQ3} = rabbit_variable_queue:fetch(false, VQ2), - - %% should be empty now - {empty, VQ4} = rabbit_variable_queue:peek(VQ3), - - VQ4. - test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bf1af596..7d584026 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -34,7 +34,7 @@ -export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, peek/1, + requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, status/1]). @@ -517,53 +517,71 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1 })}. - -peek(State) -> - internal_queue_out( - fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, - _, State1) -> - {{Msg, MsgProps}, State1} - end, State). - dropwhile(Pred, State) -> case internal_queue_out( - fun(MsgStatus = #msg_status { msg = Msg, msg_properties = MsgProps }, - Q4a, State1) -> - case Pred(Msg, MsgProps) of + fun(MsgStatus = #msg_status { msg_properties = MsgProps }, + State1) -> + case Pred(MsgProps) of true -> - {_, State2} = internal_fetch(false, Q4a, - MsgStatus, State1), + {_, State2} = internal_fetch(false, + MsgStatus, State1), dropwhile(Pred, State2); false -> - State1 + %% message needs to go back into Q4 (or + %% maybe go in for the first time if it was + %% loaded from Q3). Also the msg contents + %% might not be in RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} end end, State) of - {empty, State2} -> State2; - State2 -> State2 + {empty, StateR} -> StateR; + StateR -> StateR end. fetch(AckRequired, State) -> internal_queue_out( - fun(MsgStatus, Q4a, State1) -> - internal_fetch(AckRequired, Q4a, MsgStatus, State1) + fun(MsgStatus, State1) -> + %% it's possible that the message wasn't read from disk + %% at this point, so read it in. + {MsgStatus1, State2} = read_msg(MsgStatus, State1), + internal_fetch(AckRequired, MsgStatus1, State2) end, State). internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> - case fetch_from_q3_to_q4(State) of - {empty, State1} = Result -> a(State1), Result; - {loaded, State1} -> internal_queue_out(Fun, State1) + case fetch_from_q3(State) of + {empty, State1} = Result -> a(State1), Result; + {loaded, {MsgStatus, State1}} -> Fun(MsgStatus, State1) end; - {{value, Value}, Q4a} -> - %% don't automatically overwrite the state with the popped - %% queue because some callbacks choose to rollback the pop - %% of the message from the queue - Fun(Value, Q4a, State) + {{value, MsgStatus}, Q4a} -> + Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -internal_fetch(AckRequired, Q4a, +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, + index_on_disk = IndexOnDisk, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + msg_store_clients = MSCState}) -> + {{ok, Msg = #basic_message {}}, MSCState1} = + read_from_msg_store(MSCState, IsPersistent, Guid), + + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + + {MsgStatus #msg_status { msg = Msg }, + State #vqstate { ram_msg_count = RamMsgCount + 1, + ram_index_count = RamIndexCount1, + msg_store_clients = MSCState1 }}; +read_msg(MsgStatus, State) -> + {MsgStatus, State}. + +internal_fetch(AckRequired, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -601,8 +619,7 @@ internal_fetch(AckRequired, Q4a, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { q4 = Q4a, - ram_msg_count = RamMsgCount - 1, + a(State #vqstate { ram_msg_count = RamMsgCount - 1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, @@ -1288,40 +1305,31 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). -fetch_from_q3_to_q4(State = #vqstate { +fetch_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, - q4 = Q4, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - msg_store_clients = MSCState }) -> + q4 = Q4 }) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, guid = Guid, - is_persistent = IsPersistent }}, Q3a} -> - {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), - Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, - q4 = Q4a, - ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, - msg_store_clients = MSCState1 }, + {{value, _IndexOnDisk, MsgStatus}, Q3a} -> + + State1 = State #vqstate { q3 = Q3a}, + State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> %% q3 is now empty, it wasn't before; delta is - %% still empty. So q2 must be empty, and q1 - %% can now be joined onto q4 + %% still empty. So q2 must be empty, and we + %% know q4 is empty otherwise we wouldn't be + %% loading from q3. As such, we can just set + %% q4 to Q1. true = bpqueue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q4), %% ASSERTION State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; + q4 = Q1 }; {true, false} -> maybe_deltas_to_betas(State1); {false, _} -> @@ -1330,7 +1338,7 @@ fetch_from_q3_to_q4(State = #vqstate { %% delta and q3 are maintained State1 end, - {loaded, State2} + {loaded, {MsgStatus, State2}} end. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> -- cgit v1.2.1 From cd147162813f8c5c53cf401ebfd51ad52ef9cb98 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 29 Sep 2010 14:04:20 +0100 Subject: fixed bug with queue length and collect during run --- src/rabbit_amqqueue_process.erl | 6 +++--- src/rabbit_variable_queue.erl | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b2519b7a..a92d136b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -411,11 +411,11 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), - State1. + {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + State2. attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7d584026..31ec007e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -618,8 +618,13 @@ internal_fetch(AckRequired, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, + + RamMsgCount1 = case Msg =:= undefined of + true -> RamMsgCount; + false -> RamMsgCount - 1 + end, {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount - 1, + a(State #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, -- cgit v1.2.1 From bdcc260598eeb2cf65fdb4b4f9bcf67f85b9d86b Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 29 Sep 2010 14:24:37 +0100 Subject: refactoring --- src/rabbit_amqqueue_process.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a92d136b..d4331cbb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -590,14 +590,13 @@ msg_properties(State) -> calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = Ttl}) -> - Now = timer:now_diff(now(), {0,0,0}), - Now + (Ttl * 1000). + now_millis() + (Ttl * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - Now = timer:now_diff(now(), {0,0,0}), + Now = now_millis(), BQS1 = BQ:dropwhile( fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> Now > Expiry @@ -618,6 +617,9 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, end; ensure_ttl_timer(State) -> State. + +now_millis() -> + timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -- cgit v1.2.1 From 7a92677535029b49e18bfb5828b5f065628d74f7 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 29 Sep 2010 14:43:58 +0100 Subject: casing change and added ttl to the list of queue items in rabbitmqctl --- src/rabbit_amqqueue_process.erl | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d4331cbb..2b6850f5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -98,7 +98,8 @@ durable, auto_delete, arguments, - owner_pid + owner_pid, + ttl ]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -162,7 +163,7 @@ init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of - {_Type, Ttl} -> State#q{ttl = Ttl}; + {_Type, TTL} -> State#q{ttl = TTL}; undefined -> State end. @@ -589,8 +590,8 @@ msg_properties(State) -> calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; -calculate_msg_expiry(_State = #q{ttl = Ttl}) -> - now_millis() + (Ttl * 1000). +calculate_msg_expiry(_State = #q{ttl = TTL}) -> + now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; @@ -605,15 +606,15 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, ensure_ttl_timer(State = #q{backing_queue = BQ, backing_queue_state = BQS, - ttl = Ttl, + ttl = TTL, ttl_timer_ref = undefined}) - when Ttl =/= undefined-> + when TTL =/= undefined-> case BQ:is_empty(BQS) of true -> State; false -> State#q{ttl_timer_ref = - timer:send_after(Ttl, self(), drop_expired)} + timer:send_after(TTL, self(), drop_expired)} end; ensure_ttl_timer(State) -> State. @@ -656,6 +657,8 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(ttl, #q{ttl = TTL}) -> + TTL; i(Item, _) -> throw({bad_argument, Item}). -- cgit v1.2.1 From a692dd448ef31fd8f58160a1ba8cbe743e16d2ed Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 15:29:53 +0100 Subject: msg_properties -> message_properties in order to be consistent with message and basic_message (though within vq, we have plenty of msg_-prefixes so don't bother inside the msg_status record in there). Also, tidied up a lot of trailing whitespace --- include/rabbit.hrl | 2 +- include/rabbit_backing_queue_spec.hrl | 29 ++++++------- src/rabbit_amqqueue_process.erl | 66 +++++++++++++++--------------- src/rabbit_persister.erl | 3 +- src/rabbit_queue_index.erl | 33 +++++++-------- src/rabbit_tests.erl | 22 +++++----- src/rabbit_types.erl | 6 +-- src/rabbit_variable_queue.erl | 76 +++++++++++++++++------------------ 8 files changed, 120 insertions(+), 117 deletions(-) diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 0f0a0e87..f924878d 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -74,7 +74,7 @@ -record(event, {type, props, timestamp}). --record(msg_properties, {expiry}). +-record(message_properties, {expiry}). %%---------------------------------------------------------------------------- diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 12d32363..dcff5b37 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -34,13 +34,14 @@ %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). -type(peek_result() :: ('empty'|{rabbit_types:basic_message(), - rabbit_types:msg_properties()})). + rabbit_types:message_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). --type(msg_properties_transformer() :: - fun ((rabbit_types:msg_properties()) -> rabbit_types:msg_properties())). +-type(message_properties_transformer() :: + fun ((rabbit_types:message_properties()) + -> rabbit_types:message_properties())). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). @@ -49,26 +50,26 @@ -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: - (rabbit_types:basic_message(), rabbit_types:msg_properties(), state()) - -> state()). +-spec(publish/3 :: (rabbit_types:basic_message(), + rabbit_types:message_properties_properties(), state()) + -> state()). -spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), - rabbit_types:msg_properties(), state()) - -> {ack(), state()}). + rabbit_types:message_properties(), state()) + -> {ack(), state()}). -spec(dropwhile/2 :: - (fun ((rabbit_types:msg_properties()) -> boolean()), state()) + (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). -spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/4 :: - (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:msg_properties(), state()) -> state()). +-spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: (rabbit_types:txn(), fun (() -> any()), - msg_properties_transformer(), state()) -> {[ack()], state()}). --spec(requeue/3 :: ([ack()], msg_properties_transformer(), state()) -> state()). + message_properties_transformer(), state()) -> {[ack()], state()}). +-spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) + -> state()). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 53b98490..91d3f586 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). +-define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). -export([start_link/1, info_keys/0]). @@ -152,7 +152,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- init_queue_state(State) -> - lists:foldl(fun(F, S) -> F(S) end, State, + lists:foldl(fun(F, S) -> F(S) end, State, [fun init_expires/1, fun init_ttl/1]). init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -413,7 +413,7 @@ deliver_from_queue_deliver(AckRequired, false, State) -> run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), @@ -424,17 +424,18 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, - #msg_properties{}, BQS), + BQ:publish_delivered(AckRequired, Message, + #message_properties{}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}. + {true, + State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, #message_properties{}, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -442,25 +443,25 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, - msg_properties(State), + BQS = BQ:publish(Message, + message_properties(State), State #q.backing_queue_state), {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> + fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) end, State). -fetch(AckRequired, State = #q{backing_queue_state = BQS, +fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> + {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - {{Message, IsDelivered, AckTag, Remaining}, + {{Message, IsDelivered, AckTag, Remaining}, State#q{backing_queue_state = BQS1}} end. @@ -559,9 +560,9 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = BQ:tx_commit(Txn, - fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(State), + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(State), BQS), %% ChPid must be known here because of the participant management %% by the channel. @@ -583,38 +584,38 @@ subtract_acks(A, B) when is_list(B) -> reset_msg_expiry_fun(State) -> fun(MsgProps) -> - MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + MsgProps#message_properties{expiry = calculate_msg_expiry(State)} end. -msg_properties(State) -> - #msg_properties{expiry = calculate_msg_expiry(State)}. +message_properties(State) -> + #message_properties{expiry = calculate_msg_expiry(State)}. calculate_msg_expiry(_State = #q{ttl = undefined}) -> undefined; calculate_msg_expiry(_State = #q{ttl = TTL}) -> - now_millis() + (TTL * 1000). + now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; -drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> Now = now_millis(), BQS1 = BQ:dropwhile( - fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> + fun (_MsgProperties = #message_properties{expiry = Expiry}) -> Now > Expiry end, BQS), ensure_ttl_timer(State #q{backing_queue_state = BQS1}). -ensure_ttl_timer(State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL, - ttl_timer_ref = undefined}) +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL, + ttl_timer_ref = undefined}) when TTL =/= undefined-> case BQ:is_empty(BQS) of true -> State; false -> - State#q{ttl_timer_ref = + State#q{ttl_timer_ref = timer:send_after(TTL, self(), drop_expired)} end; ensure_ttl_timer(State) -> @@ -622,8 +623,7 @@ ensure_ttl_timer(State) -> now_millis() -> timer:now_diff(now(), {0,0,0}). - - + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -752,7 +752,7 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% %% we don't need an expiry here because messages are not being - %% enqueued, so we use an empty msg_properties. + %% enqueued, so we use an empty message_properties. {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); @@ -781,7 +781,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, AckRequired = not NoAck, State1 = ensure_expiry_timer(State), case fetch(AckRequired, drop_expired_messages(State1)) of - {empty, State2} -> + {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 513b14df..11056c8e 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -69,7 +69,8 @@ -type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). -type(work_item() :: - {publish, rabbit_types:message(), rabbit_types:msg_properties(), pmsg()} | + {publish, + rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | {deliver, pmsg()} | {ack, pmsg()}). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6568aa70..c5a3da53 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -141,7 +141,7 @@ -define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). %% publish record is binary 1 followed by a bit for is_persistent, -%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits +%% then 14 bits of rel seq id, 64 bits for message expiry and 128 bits %% of md5sum msg id -define(PUBLISH_PREFIX, 1). -define(PUBLISH_PREFIX_BITS, 1). @@ -205,15 +205,16 @@ {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). --spec(publish/5 :: (rabbit_guid:guid(), seq_id(), rabbit_types:msg_properties(), - boolean(), qistate()) -> qistate()). +-spec(publish/5 :: (rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), qistate()) + -> qistate()). -spec(deliver/2 :: ([seq_id()], qistate()) -> qistate()). -spec(ack/2 :: ([seq_id()], qistate()) -> qistate()). -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{rabbit_guid:guid(), seq_id(), - rabbit_types:msg_properties(), + {[{rabbit_guid:guid(), seq_id(), + rabbit_types:message_properties(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> @@ -258,7 +259,7 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProperties, IsPersistent, State) +publish(Guid, SeqId, MsgProperties, IsPersistent, State) when is_binary(Guid) -> ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), @@ -266,7 +267,7 @@ publish(Guid, SeqId, MsgProperties, IsPersistent, State) JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, create_pub_record_body(Guid, MsgProperties)]), maybe_flush_journal( @@ -463,8 +464,8 @@ recover_segment(ContainsCheckFun, CleanShutdown, {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, - {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, + fun (RelSeq, + {{Guid, _MsgProperties, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, Del, RelSeq, Segment1) @@ -518,8 +519,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = recover_journal(blank_state(QueueName)), [ok = segment_entries_foldr( - fun (_RelSeq, - {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, + fun (_RelSeq, + {{Guid, _MsgProps, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); (_RelSeq, _Value, Acc) -> @@ -533,7 +534,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> %% expiry/binary manipulation %%---------------------------------------------------------------------------- -create_pub_record_body(Guid, #msg_properties{expiry = Expiry}) -> +create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> [Guid, expiry_to_binary(Expiry)]. expiry_to_binary(undefined) -> @@ -552,11 +553,11 @@ read_pub_record_body(Hdl) -> ?NO_EXPIRY -> undefined; X -> X end, - {Guid, #msg_properties{expiry = Exp}}; + {Guid, #message_properties{expiry = Exp}}; Error -> Error end. - + %%---------------------------------------------------------------------------- %% journal manipulation %%---------------------------------------------------------------------------- @@ -806,8 +807,8 @@ read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, {Messages, Segments}, Dir) -> Segment = segment_find_or_new(Seg, Dir, Segments), {segment_entries_foldr( - fun (RelSeq, - {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, + fun (RelSeq, + {{Guid, MsgProperties, IsPersistent}, IsDelivered, no_ack}, Acc) when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d2489685..638a45e1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1639,7 +1639,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, #msg_properties{}, Persistent, QiN), + Guid, SeqId, #message_properties{}, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} @@ -1661,9 +1661,9 @@ test_queue_index_props() -> with_empty_test_queue( fun(Qi0) -> Guid = rabbit_guid:guid(), - Props = #msg_properties{expiry=12345}, + Props = #message_properties{expiry=12345}, Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), - {[{Guid, 1, Props, _, _}], Qi2} = + {[{Guid, 1, Props, _, _}], Qi2} = rabbit_queue_index:read(1, 2, Qi1), Qi2 end), @@ -1802,12 +1802,12 @@ variable_queue_publish(IsPersistent, Count, VQ) -> fun (_N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), + rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), - #msg_properties{}, VQN) + end}, <<>>), + #message_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1853,14 +1853,14 @@ test_dropwhile(VQ0) -> fun (N, VQN) -> rabbit_variable_queue:publish( rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #msg_properties{expiry = N}, VQN) + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( - fun(#msg_properties { expiry = Expiry }) -> + fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 end, VQ1), @@ -1875,7 +1875,7 @@ test_dropwhile(VQ0) -> {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), VQ4. - + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1db23883..7671267c 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -42,7 +42,7 @@ binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, connection_exit/0, - msg_properties/0]). + message_properties/0]). -type(channel_exit() :: no_return()). -type(connection_exit() :: no_return()). @@ -87,8 +87,8 @@ txn :: maybe(txn()), sender :: pid(), message :: message()}). --type(msg_properties() :: - #msg_properties{expiry :: pos_integer() | 'undefined'}). +-type(message_properties() :: + #message_properties{expiry :: pos_integer() | 'undefined'}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 78958717..72fa4aeb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -524,22 +524,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, in_counter = InCount + 1, persistent_count = PCount1, pending_ack = PA1 })}. - + dropwhile(Pred, State) -> case internal_queue_out( fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> case Pred(MsgProps) of true -> - {_, State2} = internal_fetch(false, + {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); + dropwhile(Pred, State2); false -> %% message needs to go back into Q4 (or %% maybe go in for the first time if it was %% loaded from Q3). Also the msg contents %% might not be in RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = read_msg(MsgStatus, State1), State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} end @@ -550,11 +550,11 @@ dropwhile(Pred, State) -> fetch(AckRequired, State) -> internal_queue_out( - fun(MsgStatus, State1) -> + fun(MsgStatus, State1) -> %% it's possible that the message wasn't read from disk %% at this point, so read it in. {MsgStatus1, State2} = read_msg(MsgStatus, State1), - internal_fetch(AckRequired, MsgStatus1, State2) + internal_fetch(AckRequired, MsgStatus1, State2) end, State). internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> @@ -568,20 +568,20 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. -read_msg(MsgStatus = #msg_status { msg = undefined, - guid = Guid, +read_msg(MsgStatus = #msg_status { msg = undefined, + guid = Guid, index_on_disk = IndexOnDisk, - is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, + is_persistent = IsPersistent }, + State = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, msg_store_clients = MSCState}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = + {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - {MsgStatus #msg_status { msg = Msg }, + {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, ram_index_count = RamIndexCount1, msg_store_clients = MSCState1 }}; @@ -590,12 +590,12 @@ read_msg(MsgStatus, State) -> internal_fetch(AckRequired, MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( @@ -637,7 +637,7 @@ internal_fetch(AckRequired, len = Len1, persistent_count = PCount1, pending_ack = PA1 })}. - + ack(AckTags, State) -> a(ack(fun rabbit_msg_store:remove/2, fun (_AckEntry, State1) -> State1 end, @@ -682,20 +682,20 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids,Pubs, AckTags1, + msg_store_callback(PersistentGuids,Pubs, AckTags1, Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, Fun, MsgPropsFun, State) end)}. requeue(AckTags, MsgPropsFun, State) -> a(reduce_memory_use( ack(fun rabbit_msg_store:release/2, - fun (#msg_status { msg = Msg, + fun (#msg_status { msg = Msg, msg_properties = MsgProperties }, State1) -> - {_SeqId, State2} = - publish(Msg, MsgPropsFun(MsgProperties), true, + {_SeqId, State2} = + publish(Msg, MsgPropsFun(MsgProperties), true, false, State1), State2; ({IsPersistent, Guid, MsgProperties}, State1) -> @@ -852,7 +852,7 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, +msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProperties) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, @@ -892,8 +892,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || - {#basic_message { guid = Guid, is_persistent = true }, + [Guid || + {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> @@ -963,7 +963,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, + true, Pubs, AckTags, Fun, MsgPropsFun, StateN) end) end, @@ -989,7 +989,7 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, true -> [AckTag || AckTag <- AckTags, case dict:fetch(AckTag, PA) of #msg_status {} -> false; - {IsPersistent, + {IsPersistent, _Guid, _MsgProps} -> IsPersistent end]; false -> [] @@ -1026,11 +1026,11 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Pubs = lists:append(lists:reverse(SPubs)), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( - fun ({Msg = #basic_message { is_persistent = IsPersistent }, + fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProperties}, {SeqIdsAcc, State2}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = + {SeqId, State3} = publish(Msg, MsgProperties, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} end, {PAcks, ack(Acks, State)}, Pubs), @@ -1098,7 +1098,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) - #msg_status { is_delivered = IsDelivered, + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of @@ -1146,8 +1146,8 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION - IndexState1 = rabbit_queue_index:publish(Guid, - SeqId, + IndexState1 = rabbit_queue_index:publish(Guid, + SeqId, MsgProperties, IsPersistent, IndexState), @@ -1172,8 +1172,8 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - msg_properties = MsgProperties } = MsgStatus, + msg_on_disk = MsgOnDisk, + msg_properties = MsgProperties } = MsgStatus, PA) -> AckEntry = case MsgOnDisk of true -> {IsPersistent, Guid, MsgProperties}; @@ -1227,8 +1227,8 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, Acc) -> Acc; -accumulate_ack(SeqId, - {IsPersistent, Guid, _MsgProperties}, +accumulate_ack(SeqId, + {IsPersistent, Guid, _MsgProperties}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. -- cgit v1.2.1 From 9bc1a5b3e2a351522ae74e83352b73aaa2f23c3e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 15:54:24 +0100 Subject: Trailing whitespace only --- src/rabbit_amqqueue.erl | 13 ++++++------- src/rabbit_invariable_queue.erl | 32 ++++++++++++++++---------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 83a13f2c..0077e223 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -312,19 +312,19 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- + end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}, {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]], ok. check_expires_argument(Val) -> - check_integer_argument(Val, - expires_not_of_acceptable_type, + check_integer_argument(Val, + expires_not_of_acceptable_type, expires_zero_or_less). check_message_ttl_argument(Val) -> - check_integer_argument(Val, - ttl_not_of_acceptable_type, + check_integer_argument(Val, + ttl_not_of_acceptable_type, ttl_zero_or_less). check_integer_argument(undefined, _, _) -> @@ -336,7 +336,7 @@ check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 -> end; check_integer_argument({_Type, _Val}, _, ZeroOrLessError) -> {error, ZeroOrLessError}. - + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -524,4 +524,3 @@ delegate_call(Pid, Msg, Timeout) -> delegate_cast(Pid, Msg) -> delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). - diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index feb7c7e1..2415468f 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -89,7 +89,7 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, %% We do not purge messages pending acks. {AckTags, PA} = rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, + fun ({#basic_message { is_persistent = false }, _MsgProps, _IsDelivered}, Acc) -> Acc; ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, @@ -100,8 +100,8 @@ purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, ok = persist_acks(QName, IsDurable, none, AckTags, PA), {Len, State #iv_state { len = 0, queue = queue:new() }}. -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, +publish(Msg, MsgProps, State = #iv_state { queue = Q, + qname = QName, durable = IsDurable, len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg, MsgProps), @@ -124,7 +124,7 @@ dropwhile(Pred, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), case Pred(MsgProps) of true -> - {_, State1} = + {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, IsDelivered, State), dropwhile(Pred, State1); false -> @@ -137,10 +137,10 @@ fetch(AckRequired, State = #iv_state { queue = Q }) -> {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). -fetch_internal(AckRequired, Q1, - Msg = #basic_message {guid = Guid}, +fetch_internal(AckRequired, Q1, + Msg = #basic_message {guid = Guid}, MsgProps, IsDelivered, - State = #iv_state { len = Len, + State = #iv_state { len = Len, qname = QName, durable = IsDurable, pending_ack = PA }) -> @@ -183,9 +183,9 @@ tx_rollback(Txn, State = #iv_state { qname = QName }) -> erase_tx(Txn), {lists:flatten(AckTags), State}. -tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, - pending_ack = PA, - queue = Q, +tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, + pending_ack = PA, + queue = Q, len = Len }) -> #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, @@ -195,13 +195,13 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, AckTags1 = lists:flatten(AckTags), PA1 = remove_acks(AckTags1, PA), {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - {enqueue(Msg, MsgPropsFun(MsgProps), - false, QN), + {enqueue(Msg, MsgPropsFun(MsgProps), + false, QN), LenN + 1} end, {Q, Len}, PubsRev), {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, +requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, queue = Q, len = Len }) -> %% We don't need to touch the persister here - the persister will @@ -217,7 +217,7 @@ requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, fun (Guid, {QN, LenN}) -> {Msg = #basic_message {}, MsgProps} = dict:fetch(Guid, PA), - {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), + {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), LenN + 1} end, {Q, Len}, AckTags), PA1 = remove_acks(AckTags, PA), @@ -286,7 +286,7 @@ persist_message(QName, true, Txn, Msg = #basic_message { content = rabbit_binary_parser:clear_decoded_content( Msg #basic_message.content)}, persist_work(Txn, QName, - [{publish, Msg1, MsgProps, + [{publish, Msg1, MsgProps, {QName, Msg1 #basic_message.guid}}]); persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> ok. @@ -301,7 +301,7 @@ persist_acks(QName, true, Txn, AckTags, PA) -> persist_work(Txn, QName, [{ack, {QName, Guid}} || Guid <- AckTags, begin - {ok, {Msg, _MsgProps}} + {ok, {Msg, _MsgProps}} = dict:find(Guid, PA), Msg #basic_message.is_persistent end]); -- cgit v1.2.1 From e858ef57e7c03c697f3dcc39612f2291de09920c Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 15:57:37 +0100 Subject: Overeager M-/... I did run dialyzer but it failed to spot it. --- include/rabbit_backing_queue_spec.hrl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index dcff5b37..a22f85fa 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -33,8 +33,8 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). --type(peek_result() :: ('empty'|{rabbit_types:basic_message(), - rabbit_types:message_properties()})). +-type(peek_result() :: ('empty' | {rabbit_types:basic_message(), + rabbit_types:message_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). @@ -51,8 +51,7 @@ -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). -spec(publish/3 :: (rabbit_types:basic_message(), - rabbit_types:message_properties_properties(), state()) - -> state()). + rabbit_types:message_properties(), state()) -> state()). -spec(publish_delivered/4 :: (ack_required(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> {ack(), state()}). -- cgit v1.2.1 From 249397d53e858f49e5d55fb0f190555953b93add Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 16:40:38 +0100 Subject: cosmetic --- src/rabbit_amqqueue_process.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 91d3f586..d941fd35 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -456,7 +456,7 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> {empty, State#q{backing_queue_state = BQS1}}; -- cgit v1.2.1 From 3f3caf736eed01f3670ca6d69b4a1a89952910d3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 17:21:59 +0100 Subject: cosmetic --- src/rabbit_amqqueue_process.erl | 4 ++-- src/rabbit_invariable_queue.erl | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d941fd35..ff43accc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -785,8 +785,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index 2415468f..bc58cc8c 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -105,8 +105,7 @@ publish(Msg, MsgProps, State = #iv_state { queue = Q, durable = IsDurable, len = Len }) -> ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - Q1 = enqueue(Msg, MsgProps, false, Q), - State #iv_state { queue = Q1, len = Len + 1 }. + State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. publish_delivered(false, _Msg, _MsgProps, State) -> {blank_ack, State}; @@ -163,7 +162,7 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, State #iv_state { pending_ack = PA1 }. tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, - durable = IsDurable }) -> + durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), -- cgit v1.2.1 From 72c74753173d556c8bdcdad2a0634342094d34e1 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 17:50:32 +0100 Subject: cosmetic --- src/rabbit_queue_index.erl | 29 ++++++++++++++--------------- src/rabbit_tests.erl | 28 ++++++++++++++-------------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c5a3da53..f84dff83 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -98,12 +98,12 @@ %% and seeding the message store on start up. %% %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), ('del'|'no_del'), -%% ('ack'|'no_ack')} is richer than strictly necessary for most -%% operations. However, for startup, and to ensure the safe and -%% correct combination of journal entries with entries read from the -%% segment on disk, this richer representation vastly simplifies and -%% clarifies the code. +%% the tuple: {('no_pub'|{Guid, MsgProperties, IsPersistent}), +%% ('del'|'no_del'), ('ack'|'no_ack')} is richer than strictly +%% necessary for most operations. However, for startup, and to ensure +%% the safe and correct combination of journal entries with entries +%% read from the segment on disk, this richer representation vastly +%% simplifies and clarifies the code. %% %% For notes on Clean Shutdown and startup, see documentation in %% variable_queue. @@ -300,7 +300,7 @@ flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], State}; -read(Start, End, State = #qistate { segments = Segments, +read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), @@ -537,10 +537,8 @@ queue_index_walker_reader(QueueName, Gatherer) -> create_pub_record_body(Guid, #message_properties{expiry = Expiry}) -> [Guid, expiry_to_binary(Expiry)]. -expiry_to_binary(undefined) -> - <>; -expiry_to_binary(Expiry) -> - <>. +expiry_to_binary(undefined) -> <>; +expiry_to_binary(Expiry) -> <>. read_pub_record_body(Hdl) -> case file_handle_cache:read(Hdl, ?GUID_BYTES + ?EXPIRY_BYTES) of @@ -681,10 +679,11 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> _ -> case read_pub_record_body(Hdl) of {Guid, MsgProperties} -> - Publish = {Guid, MsgProperties, case Prefix of - ?PUB_PERSIST_JPREFIX -> true; - ?PUB_TRANS_JPREFIX -> false - end}, + Publish = {Guid, MsgProperties, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, load_journal_entries( add_to_journal(SeqId, Publish, State)); _ErrOrEoF -> %% err, we've lost at least a publish diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 638a45e1..72321f78 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1850,26 +1850,26 @@ test_dropwhile(VQ0) -> %% add messages with sequential expiry VQ1 = lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) - end, VQ0, lists:seq(1, Count)), + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, VQ1), + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _, _}, VQM} = - rabbit_variable_queue:fetch(false, VQN), - VQM - end, VQ2, lists:seq(1, 5)), + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(6, Count)), %% should be empty now {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), -- cgit v1.2.1 From 7321cd2ffca115b78cd6acb7f6548e64a7e4e8a9 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 18:07:07 +0100 Subject: Refactoring and cosmetic --- src/rabbit_variable_queue.erl | 62 ++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 72fa4aeb..608e2dcd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -526,27 +526,26 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, pending_ack = PA1 })}. dropwhile(Pred, State) -> - case internal_queue_out( - fun(MsgStatus = #msg_status { msg_properties = MsgProps }, - State1) -> - case Pred(MsgProps) of - true -> - {_, State2} = internal_fetch(false, - MsgStatus, State1), - dropwhile(Pred, State2); - false -> - %% message needs to go back into Q4 (or - %% maybe go in for the first time if it was - %% loaded from Q3). Also the msg contents - %% might not be in RAM, so read them in now - {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = - read_msg(MsgStatus, State1), - State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4)} - end - end, State) of - {empty, StateR} -> StateR; - StateR -> StateR - end. + {_OkOrEmpty, State1} = dropwhile1(Pred, State), + State1. + +dropwhile1(Pred, State) -> + internal_queue_out( + fun(MsgStatus = #msg_status { msg_properties = MsgProps }, State1) -> + case Pred(MsgProps) of + true -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile1(Pred, State2); + false -> + %% message needs to go back into Q4 (or maybe go + %% in for the first time if it was loaded from + %% Q3). Also the msg contents might not be in + %% RAM, so read them in now + {MsgStatus1, State2 = #vqstate { q4 = Q4 }} = + read_msg(MsgStatus, State1), + {ok, State2 #vqstate {q4 = queue:in_r(MsgStatus1, Q4) }} + end + end, State). fetch(AckRequired, State) -> internal_queue_out( @@ -589,14 +588,14 @@ read_msg(MsgStatus, State) -> {MsgStatus, State}. internal_fetch(AckRequired, - MsgStatus = #msg_status { - msg = Msg, guid = Guid, seq_id = SeqId, - is_persistent = IsPersistent, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, - State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, - pending_ack = PA }) -> + MsgStatus = #msg_status { + msg = Msg, guid = Guid, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State = #vqstate { + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, + pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -625,11 +624,8 @@ internal_fetch(AckRequired, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, + RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - RamMsgCount1 = case Msg =:= undefined of - true -> RamMsgCount; - false -> RamMsgCount - 1 - end, {{Msg, IsDelivered, AckTag, Len1}, a(State #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, -- cgit v1.2.1 From 7d15df90953e33d7e7c20b5268a63fa5df4607b4 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 6 Oct 2010 18:34:54 +0100 Subject: cosmetics --- src/rabbit_variable_queue.erl | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 608e2dcd..e74447e2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -678,7 +678,7 @@ tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids,Pubs, AckTags1, + msg_store_callback(PersistentGuids, Pubs, AckTags1, Fun, MsgPropsFun)), State; false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, @@ -888,9 +888,8 @@ store_tx(Txn, Tx) -> put({txn, Txn}, Tx). erase_tx(Txn) -> erase({txn, Txn}). persistent_guids(Pubs) -> - [Guid || - {#basic_message { guid = Guid, is_persistent = true }, - _MsgProps} <- Pubs]. + [Guid || {#basic_message { guid = Guid, is_persistent = true }, _MsgProps} + <- Pubs]. betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = @@ -1337,9 +1336,7 @@ fetch_from_q3(State = #vqstate { {empty, _Q3} -> {empty, State}; {{value, _IndexOnDisk, MsgStatus}, Q3a} -> - - State1 = State #vqstate { q3 = Q3a}, - + State1 = State #vqstate { q3 = Q3a }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> -- cgit v1.2.1 From b4bfdde3c0e73a0a83ab0b4cf95914490843fce3 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 20:25:51 +0100 Subject: changed usage of #message_properties{} to ?BASE_MESSAGE_PROPERTIES --- src/rabbit_amqqueue_process.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ff43accc..b0365ee3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -425,7 +425,7 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Message, - #message_properties{}, BQS), + ?BASE_MESSAGE_PROPERTIES, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, @@ -435,7 +435,7 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, #message_properties{}, BQS)}}. + BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of -- cgit v1.2.1 From 81a906368ee24038d1c9bb9d0b00b8214711a9ac Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 20:35:42 +0100 Subject: narrowed the capture on reset_msg_expiry_fun --- src/rabbit_amqqueue_process.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b0365ee3..8770ff32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -449,10 +449,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -558,11 +558,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> {AckTags, BQS1} = BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(State), + reset_msg_expiry_fun(TTL), BQS), %% ChPid must be known here because of the participant management %% by the channel. @@ -582,17 +583,17 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -reset_msg_expiry_fun(State) -> +reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(State)} + MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} end. -message_properties(State) -> - #message_properties{expiry = calculate_msg_expiry(State)}. +message_properties(#q{ttl=TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL)}. -calculate_msg_expiry(_State = #q{ttl = undefined}) -> +calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(_State = #q{ttl = TTL}) -> +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> -- cgit v1.2.1 From 888556965532313c67eb9be46b7997e2c489b16c Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 20:36:28 +0100 Subject: removed unnecessary _MsgProperties variable name --- src/rabbit_amqqueue_process.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8770ff32..4aa3eb8f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -602,7 +602,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_millis(), BQS1 = BQ:dropwhile( - fun (_MsgProperties = #message_properties{expiry = Expiry}) -> + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, BQS), ensure_ttl_timer(State #q{backing_queue_state = BQS1}). -- cgit v1.2.1 From 1dcfb86c14f608a12e70e83cde8ddb3b748b0bab Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 21:09:30 +0100 Subject: Reworked ensure_ttl_timer to use apply_after rather than send_after --- src/rabbit_amqqueue.erl | 5 ++++- src/rabbit_amqqueue_process.erl | 10 ++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0077e223..25a061ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1]). + set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -484,6 +484,9 @@ set_maximum_since_use(QPid, Age) -> maybe_expire(QPid) -> gen_server2:cast(QPid, maybe_expire). +drop_expired(QPid) -> + gen_server2:cast(QPid, drop_expired). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4aa3eb8f..65166f32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -617,7 +617,8 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, State; false -> State#q{ttl_timer_ref = - timer:send_after(TTL, self(), drop_expired)} + timer:apply_after(TTL, rabbit_amqqueue, + drop_expired, [self()])} end; ensure_ttl_timer(State) -> State. @@ -685,6 +686,7 @@ prioritise_cast(Msg, _State) -> {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; maybe_expire -> 8; + drop_expired -> 8; emit_stats -> 7; {ack, _Txn, _MsgIds, _ChPid} -> 7; {reject, _MsgIds, _Requeue, _ChPid} -> 7; @@ -1008,6 +1010,9 @@ handle_cast(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; +handle_cast(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_cast(emit_stats, State = #q{stats_timer = StatsTimer}) -> %% Do not invoke noreply as it would see no timer and create a new one. emit_stats(State), @@ -1034,9 +1039,6 @@ handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:idle_timeout(BQS) end, State)); -handle_info(drop_expired, State) -> - noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -- cgit v1.2.1 From de31efa054eb147908b8a789124c9d845a25362a Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 21:35:33 +0100 Subject: removed ttl from queue ttl info items --- src/rabbit_amqqueue_process.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 65166f32..95a5070b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -98,8 +98,7 @@ durable, auto_delete, arguments, - owner_pid, - ttl + owner_pid ]). -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). @@ -660,8 +659,6 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); -i(ttl, #q{ttl = TTL}) -> - TTL; i(Item, _) -> throw({bad_argument, Item}). -- cgit v1.2.1 From 7abf32f6872b951c2185118262eb5834680812ed Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 21:42:52 +0100 Subject: fixed comment for attempt_delivery --- src/rabbit_amqqueue_process.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 95a5070b..5dd74dbf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -422,6 +422,9 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> + %% we don't need an expiry here because messages are + %% not being enqueued, so we use an empty + %% message_properties. {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Message, ?BASE_MESSAGE_PROPERTIES, BQS), @@ -751,8 +754,6 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% queues discarding the message? %% - %% we don't need an expiry here because messages are not being - %% enqueued, so we use an empty message_properties. {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -- cgit v1.2.1 From bf01fd539ad12113d71d5a60718c5b195ce19a75 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 8 Oct 2010 12:28:05 +0100 Subject: fixed RamIndexMsgCount accounting --- src/rabbit_variable_queue.erl | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e74447e2..05e540c0 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -569,20 +569,14 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> read_msg(MsgStatus = #msg_status { msg = undefined, guid = Guid, - index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - State = #vqstate { ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, + State = #vqstate { ram_msg_count = RamMsgCount, msg_store_clients = MSCState}) -> {{ok, Msg = #basic_message {}}, MSCState1} = read_from_msg_store(MSCState, IsPersistent, Guid), - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - {MsgStatus #msg_status { msg = Msg }, State #vqstate { ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, msg_store_clients = MSCState1 }}; read_msg(MsgStatus, State) -> {MsgStatus, State}. @@ -593,9 +587,9 @@ internal_fetch(AckRequired, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, State = #vqstate { - ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, - pending_ack = PA }) -> + ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, + out_counter = OutCount, index_state = IndexState, len = Len, + persistent_count = PCount, pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -626,8 +620,12 @@ internal_fetch(AckRequired, Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + {{Msg, IsDelivered, AckTag, Len1}, a(State #vqstate { ram_msg_count = RamMsgCount1, + ram_index_count = RamIndexCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, -- cgit v1.2.1 From 3cfa9ac6cab3c190e83e3c6df5841bcebea3d658 Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Fri, 8 Oct 2010 14:28:49 +0100 Subject: tweaked RamIndexCount accounting again, now it doesn't get decremented twice when reinserting into the queue. Also moved transactional msgpropsfun application to tx_commit_index --- src/rabbit_variable_queue.erl | 57 +++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 05e540c0..3cfebb50 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -620,12 +620,8 @@ internal_fetch(AckRequired, Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - {{Msg, IsDelivered, AckTag, Len1}, a(State #vqstate { ram_msg_count = RamMsgCount1, - ram_index_count = RamIndexCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, @@ -987,22 +983,20 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, end]; false -> [] end, - PubsOrdered = lists:foldl( - fun ({Msg, MsgProps}, Acc) -> - [{Msg, MsgPropsFun(MsgProps)} | Acc] - end, [], Pubs), case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [PubsOrdered | SPubs], - funs = [Fun | SFuns] }}; + true -> State #vqstate { + on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [{MsgPropsFun, Pubs} | SPubs], + funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( - State #vqstate { on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [PubsOrdered], - funs = [Fun] } }), + State #vqstate { + on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [{MsgPropsFun, Pubs}], + funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. @@ -1016,7 +1010,13 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:append(lists:reverse(SPubs)), + Pubs = lists:foldl( + fun({Fun, PubsN}, OuterAcc) -> + lists:foldl( + fun({Msg, MsgProps}, Acc) -> + [{Msg, Fun(MsgProps)} | Acc] + end, OuterAcc, PubsN) + end, [], SPubs), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, @@ -1325,16 +1325,21 @@ chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). fetch_from_q3(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4 }) -> + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_index_count = RamIndexCount}) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, _IndexOnDisk, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, + {{value, IndexOnDisk, MsgStatus}, Q3a} -> + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + + State1 = State #vqstate { q3 = Q3a, + ram_index_count = RamIndexCount1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> -- cgit v1.2.1 From 9fde9e069323b6ba50743c75764f4cce19ed5b5b Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 12:38:10 +0100 Subject: Remove unused variable and remove peek_result() type which is unused --- include/rabbit_backing_queue_spec.hrl | 2 -- src/rabbit_variable_queue.erl | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index a22f85fa..20230b24 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -33,8 +33,6 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})). --type(peek_result() :: ('empty' | {rabbit_types:basic_message(), - rabbit_types:message_properties()})). -type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3cfebb50..587bc939 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -587,8 +587,8 @@ internal_fetch(AckRequired, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, State = #vqstate { - ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - out_counter = OutCount, index_state = IndexState, len = Len, + ram_msg_count = RamMsgCount, out_counter = OutCount, + index_state = IndexState, len = Len, persistent_count = PCount, pending_ack = PA }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( -- cgit v1.2.1 From b094e9fbf67da0b5cd224b0b70d4ba91140010c3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 12:48:12 +0100 Subject: Minor refactor --- src/rabbit_amqqueue_process.erl | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7be5a197..397fe31b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -459,13 +459,8 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> - {empty, State#q{backing_queue_state = BQS1}}; - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> - {{Message, IsDelivered, AckTag, Remaining}, - State#q{backing_queue_state = BQS1}} - end. + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + {Result, State#q{backing_queue_state = BQS1}}. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). -- cgit v1.2.1 From 223169c50fd88e97695a04ea1559b4682389b1d9 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 12:58:29 +0100 Subject: Whitespace --- src/rabbit_amqqueue_process.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 397fe31b..a2fab54c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -602,13 +602,13 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, BQS), - ensure_ttl_timer(State #q{backing_queue_state = BQS1}). + ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, backing_queue_state = BQS, ttl = TTL, ttl_timer_ref = undefined}) - when TTL =/= undefined-> + when TTL =/= undefined -> case BQ:is_empty(BQS) of true -> State; @@ -748,7 +748,6 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); -- cgit v1.2.1 From 993fd4a1ed29412237886b511830d508e10fdd0e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 13:12:27 +0100 Subject: Textuality --- src/rabbit_backing_queue.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4f71c1a8..352e76fd 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -69,8 +69,8 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 4}, - %% Drop messages in the queue while the supplied predicate - %% returns true and return the new state. + %% Drop messages from the head of the queue while the supplied + %% predicate returns true. {dropwhile, 2}, %% Produce the next message. -- cgit v1.2.1 From 4a84bd9d7908931bbe7ce22e7f24129b66e6e7f7 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 15:29:45 +0100 Subject: Cosmetic --- src/rabbit_invariable_queue.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl index bc58cc8c..ad01d8f7 100644 --- a/src/rabbit_invariable_queue.erl +++ b/src/rabbit_invariable_queue.erl @@ -137,7 +137,7 @@ fetch(AckRequired, State = #iv_state { queue = Q }) -> fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). fetch_internal(AckRequired, Q1, - Msg = #basic_message {guid = Guid}, + Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered, State = #iv_state { len = Len, qname = QName, @@ -161,7 +161,7 @@ ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, PA1 = remove_acks(AckTags, PA), State #iv_state { pending_ack = PA1 }. -tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, +tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, durable = IsDurable }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), -- cgit v1.2.1 From 8ae2c303acb5def8c33b05acf49a9c0b3cfcded3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 21 Oct 2010 16:55:07 +0100 Subject: Correct spec for sync(), and some cosmetics --- src/rabbit_tests.erl | 3 +-- src/rabbit_variable_queue.erl | 17 ++++++++--------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0db1150b..00547a26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1909,8 +1909,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = - rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). test_variable_queue_partial_segments_delta_thing(VQ0) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 587bc939..0b948c1b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -295,7 +295,8 @@ -type(sync() :: #sync { acks_persistent :: [[seq_id()]], acks_all :: [[seq_id()]], - pubs :: [[rabbit_guid:guid()]], + pubs :: [{message_properties_transformer(), + [rabbit_types:basic_message()]}], funs :: [fun (() -> any())] }). -type(state() :: #vqstate { @@ -1010,7 +1011,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:foldl( + Pubs = lists:foldl( fun({Fun, PubsN}, OuterAcc) -> lists:foldl( fun({Msg, MsgProps}, Acc) -> @@ -1091,8 +1092,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProperties)) - #msg_status { is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk}, + #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), State2 = case bpqueue:is_empty(Q3) of false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) }; @@ -1131,10 +1131,10 @@ maybe_write_index_to_disk(_Force, MsgStatus = #msg_status { true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = Guid, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, + guid = Guid, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, msg_properties = MsgProperties}, IndexState) when Force orelse IsPersistent -> @@ -1337,7 +1337,6 @@ fetch_from_q3(State = #vqstate { {{value, IndexOnDisk, MsgStatus}, Q3a} -> RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, ram_index_count = RamIndexCount1 }, State2 = -- cgit v1.2.1