From 1f5807c9a1124b65a33d28f9986db60f759da7cf Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 21 Oct 2010 00:21:37 +0100 Subject: adapt vq to new msg_store and qi APIs --- src/rabbit_variable_queue.erl | 329 ++++++++++++++++++++++++------------------ 1 file changed, 186 insertions(+), 143 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cbc71bcc..b40d21e4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -368,16 +368,17 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> - {DeltaCount, Terms, IndexState} = - rabbit_queue_index:init( - QueueName, Recover, - rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), - fun (Guid) -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) - end), - {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), +init(QueueName, IsDurable, false) -> + IndexState = rabbit_queue_index:init(QueueName), + init(IsDurable, IndexState, 0, [], + case IsDurable of + true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + false -> undefined + end, + msg_store_client_init(?TRANSIENT_MSG_STORE)); +init(QueueName, true, true) -> + Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of [] -> {proplists:get_value(persistent_ref, Terms), @@ -385,64 +386,32 @@ init(QueueName, IsDurable, Recover) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - DeltaCount1 = proplists:get_value(persistent_count, Terms1, DeltaCount), - Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of - true -> ?BLANK_DELTA; - false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } - end, - Now = now(), - PersistentClient = - case IsDurable of - true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); - false -> undefined - end, - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), - State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {{PersistentClient, PRef}, - {TransientClient, TRef}}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - duration_target = infinity, - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, - a(maybe_deltas_to_betas(State)). + PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, + PRef), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, + TRef), + {DeltaCount, IndexState} = + rabbit_queue_index:recover( + QueueName, Terms, + rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), + fun (Guid) -> + rabbit_msg_store:contains(Guid, PersistentClient) + end), + init(true, IndexState, DeltaCount, Terms1, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(true, tx_commit_index(State)), - case MSCStateP of - undefined -> ok; - _ -> rabbit_msg_store:client_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE) - end, - rabbit_msg_store:client_terminate(MSCStateT, ?TRANSIENT_MSG_STORE), + PRef = case MSCStateP of + undefined -> ok; + _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), + rabbit_msg_store:client_ref(MSCStateP) + end, + rabbit_msg_store:client_terminate(MSCStateT), + TRef = rabbit_msg_store:client_ref(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], @@ -458,37 +427,37 @@ delete_and_terminate(State) -> %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}} } = + msg_store_clients = {MSCStateP, MSCStateT} } = remove_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; - _ -> rabbit_msg_store:client_delete_and_terminate( - MSCStateP, ?PERSISTENT_MSG_STORE, PRef) + _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, - rabbit_msg_store:client_delete_and_terminate( - MSCStateT, ?TRANSIENT_MSG_STORE, TRef), + rabbit_msg_store:client_delete_and_terminate(MSCStateT), a(State2 #vqstate { index_state = IndexState1, msg_store_clients = undefined }). -purge(State = #vqstate { q4 = Q4, - index_state = IndexState, - len = Len, - persistent_count = PCount }) -> +purge(State = #vqstate { q4 = Q4, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount }) -> %% TODO: when there are no pending acks, which is a common case, %% we could simply wipe the qi instead of issuing delivers and %% acks for all the messages. {LensByStore, IndexState1} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q4, - orddict:new(), IndexState), - {LensByStore1, State1 = #vqstate { q1 = Q1, index_state = IndexState2 }} = + orddict:new(), IndexState, MSCState), + {LensByStore1, State1 = #vqstate { q1 = Q1, + index_state = IndexState2, + msg_store_clients = MSCState1 }} = purge_betas_and_deltas(LensByStore, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), {LensByStore2, IndexState3} = remove_queue_entries( fun rabbit_misc:queue_fold/3, Q1, - LensByStore1, IndexState2), + LensByStore1, IndexState2, MSCState1), PCount1 = PCount - find_persistent_count(LensByStore2), {Len, a(State1 #vqstate { q1 = queue:new(), index_state = IndexState3, @@ -523,13 +492,14 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount1, pending_ack = PA1 })}. -fetch(AckRequired, State = #vqstate { q4 = Q4, - ram_msg_count = RamMsgCount, - out_counter = OutCount, - index_state = IndexState, - len = Len, - persistent_count = PCount, - pending_ack = PA }) -> +fetch(AckRequired, State = #vqstate { q4 = Q4, + ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + msg_store_clients = MSCState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_to_q4(State) of @@ -548,8 +518,9 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, SeqId, IndexState), %% 2. Remove from msg_store and queue index, if necessary - MsgStore = find_msg_store(IsPersistent), - Rem = fun () -> ok = rabbit_msg_store:remove(MsgStore, [Guid]) end, + Rem = fun () -> + ok = msg_store_remove(MSCState, IsPersistent, [Guid]) + end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of @@ -581,7 +552,7 @@ fetch(AckRequired, State = #vqstate { q4 = Q4, end. ack(AckTags, State) -> - a(ack(fun rabbit_msg_store:remove/2, + a(ack(fun msg_store_remove/3, fun (_AckEntry, State1) -> State1 end, AckTags, State)). @@ -603,17 +574,18 @@ tx_ack(Txn, AckTags, State) -> store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), State. -tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> +tx_rollback(Txn, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), ok = case IsDurable of - true -> rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, - persistent_guids(Pubs)); + true -> msg_store_remove(MSCState, true, persistent_guids(Pubs)); false -> ok end, {lists:append(AckTags), a(State)}. -tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> +tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), PubsOrdered = lists:reverse(Pubs), @@ -622,8 +594,9 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of - true -> ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentGuids, + true -> ok = msg_store_sync( + MSCState, true, + PersistentGuids, msg_store_callback(PersistentGuids, PubsOrdered, AckTags1, Fun)), State; @@ -633,14 +606,14 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, State) -> a(reduce_memory_use( - ack(fun rabbit_msg_store:release/2, + ack(fun msg_store_release/3, fun (#msg_status { msg = Msg }, State1) -> {_SeqId, State2} = publish(Msg, true, false, State1), State2; ({IsPersistent, Guid}, State1) -> #vqstate { msg_store_clients = MSCState } = State1, {{ok, Msg = #basic_message{}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), State2 = State1 #vqstate { msg_store_clients = MSCState1 }, {_SeqId, State3} = publish(Msg, true, true, State2), State3 @@ -795,22 +768,47 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> is_persistent = IsPersistent, is_delivered = false, msg_on_disk = false, index_on_disk = false }. -find_msg_store(true) -> ?PERSISTENT_MSG_STORE; -find_msg_store(false) -> ?TRANSIENT_MSG_STORE. +with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> + {Result, MSCStateP1} = Fun(MSCStateP), + {Result, {MSCStateP1, MSCStateT}}; +with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) -> + {Result, MSCStateT1} = Fun(MSCStateT), + {Result, {MSCStateP, MSCStateT1}}. + +with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> + {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent, + fun (MSCState1) -> + {Fun(MSCState1), MSCState1} + end), + Res. -with_msg_store_state({{MSCStateP, PRef}, MSCStateT}, true, Fun) -> - {Result, MSCStateP1} = Fun(?PERSISTENT_MSG_STORE, MSCStateP), - {Result, {{MSCStateP1, PRef}, MSCStateT}}; -with_msg_store_state({MSCStateP, {MSCStateT, TRef}}, false, Fun) -> - {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), - {Result, {MSCStateP, {MSCStateT1, TRef}}}. +msg_store_client_init(MsgStore) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). -read_from_msg_store(MSCState, IsPersistent, Guid) -> +msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_msg_store_state( MSCState, IsPersistent, - fun (MsgStore, MSCState1) -> - rabbit_msg_store:read(MsgStore, Guid, MSCState1) - end). + fun (MSCState1) -> rabbit_msg_store:write(Guid, Msg, MSCState1) end). + +msg_store_read(MSCState, IsPersistent, Guid) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:read(Guid, MSCState1) end). + +msg_store_remove(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:remove(Guids, MCSState1) end). + +msg_store_release(MSCState, IsPersistent, Guids) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MCSState1) -> rabbit_msg_store:release(Guids, MCSState1) end). + +msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> + with_immutable_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; @@ -892,6 +890,49 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- +init(IsDurable, IndexState, DeltaCount, Terms, + PersistentClient, TransientClient) -> + {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), + + DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), + Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of + true -> ?BLANK_DELTA; + false -> #delta { start_seq_id = LowSeqId, + count = DeltaCount1, + end_seq_id = NextSeqId } + end, + Now = now(), + State = #vqstate { + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + duration_target = infinity, + target_ram_msg_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = #rates { egress = {Now, 0}, + ingress = {Now, DeltaCount1}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Now } }, + a(maybe_deltas_to_betas(State)). + msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( @@ -900,13 +941,17 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> rabbit_msg_store:remove( - ?PERSISTENT_MSG_STORE, + fun () -> remove_persistent_messages( PersistentGuids) end, F) end) end. +remove_persistent_messages(Guids) -> + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + ok = rabbit_msg_store:remove(Guids, PersistentClient), + rabbit_msg_store:client_delete_and_terminate(PersistentClient). + tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, State = #vqstate { on_sync = OnSync = #sync { @@ -965,13 +1010,14 @@ tx_commit_index(State = #vqstate { on_sync = #sync { State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, - State = #vqstate { q3 = Q3, - index_state = IndexState }) -> + State = #vqstate { q3 = Q3, + index_state = IndexState, + msg_store_clients = MSCState }) -> case bpqueue:is_empty(Q3) of true -> {LensByStore, State}; - false -> {LensByStore1, IndexState1} = remove_queue_entries( - fun beta_fold/3, Q3, - LensByStore, IndexState), + false -> {LensByStore1, IndexState1} = + remove_queue_entries(fun beta_fold/3, Q3, + LensByStore, IndexState, MSCState), purge_betas_and_deltas(LensByStore1, maybe_deltas_to_betas( State #vqstate { @@ -979,11 +1025,11 @@ purge_betas_and_deltas(LensByStore, index_state = IndexState1 })) end. -remove_queue_entries(Fold, Q, LensByStore, IndexState) -> +remove_queue_entries(Fold, Q, LensByStore, IndexState, MSCState) -> {GuidsByStore, Delivers, Acks} = Fold(fun remove_queue_entries1/2, {orddict:new(), [], []}, Q), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), {sum_guids_by_store_to_len(LensByStore, GuidsByStore), rabbit_queue_index:ack(Acks, @@ -995,8 +1041,7 @@ remove_queue_entries1( index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, {GuidsByStore, Delivers, Acks}) -> {case MsgOnDisk of - true -> rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, - GuidsByStore); + true -> rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore); false -> GuidsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -1004,8 +1049,8 @@ remove_queue_entries1( sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> orddict:fold( - fun (MsgStore, Guids, LensByStore1) -> - orddict:update_counter(MsgStore, length(Guids), LensByStore1) + fun (IsPersistent, Guids, LensByStore1) -> + orddict:update_counter(IsPersistent, length(Guids), LensByStore1) end, LensByStore, GuidsByStore). %%---------------------------------------------------------------------------- @@ -1043,16 +1088,11 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> - {ok, MSCState1} = - with_msg_store_state( - MSCState, IsPersistent, - fun (MsgStore, MSCState2) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - rabbit_msg_store:write(MsgStore, Guid, Msg1, MSCState2) - end), + Msg1 = Msg #basic_message { + %% don't persist any recoverable decoded properties + content = rabbit_binary_parser:clear_decoded_content( + Msg #basic_message.content)}, + {ok, MSCState1} = msg_store_write(MSCState, IsPersistent, Guid, Msg1), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; maybe_write_msg_to_disk(_Force, MsgStatus, MSCState) -> {MsgStatus, MSCState}. @@ -1098,22 +1138,23 @@ record_pending_ack(#msg_status { guid = Guid, seq_id = SeqId, dict:store(SeqId, AckEntry, PA). remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState }) -> + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, {[], orddict:new()}, PA), State1 = State #vqstate { pending_ack = dict:new() }, case KeepPersistent of - true -> case orddict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + true -> case orddict:find(false, GuidsByStore) of error -> State1; - {ok, Guids} -> ok = rabbit_msg_store:remove( - ?TRANSIENT_MSG_STORE, Guids), + {ok, Guids} -> ok = msg_store_remove(MSCState, false, + Guids), State1 end; false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), ok = orddict:fold( - fun (MsgStore, Guids, ok) -> - rabbit_msg_store:remove(MsgStore, Guids) + fun (IsPersistent, Guids, ok) -> + msg_store_remove(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), State1 #vqstate { index_state = IndexState1 } end. @@ -1121,8 +1162,10 @@ remove_pending_ack(KeepPersistent, ack(_MsgStoreFun, _Fun, [], State) -> State; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, State1 = #vqstate { index_state = IndexState, - persistent_count = PCount }} = + {{SeqIds, GuidsByStore}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount }} = lists:foldl( fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> {ok, AckEntry} = dict:find(SeqId, PA), @@ -1131,8 +1174,8 @@ ack(MsgStoreFun, Fun, AckTags, State) -> pending_ack = dict:erase(SeqId, PA) })} end, {{[], orddict:new()}, State}, AckTags), IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (MsgStore, Guids, ok) -> - MsgStoreFun(MsgStore, Guids) + ok = orddict:fold(fun (IsPersistent, Guids, ok) -> + MsgStoreFun(MSCState, IsPersistent, Guids) end, ok, GuidsByStore), PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), @@ -1145,10 +1188,10 @@ accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS Acc; accumulate_ack(SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(find_msg_store(IsPersistent), Guid, Dict)}. + rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. find_persistent_count(LensByStore) -> - case orddict:find(?PERSISTENT_MSG_STORE, LensByStore) of + case orddict:find(true, LensByStore) of error -> 0; {ok, Len} -> Len end. @@ -1261,7 +1304,7 @@ fetch_from_q3_to_q4(State = #vqstate { msg = undefined, guid = Guid, is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message {}}, MSCState1} = - read_from_msg_store(MSCState, IsPersistent, Guid), + msg_store_read(MSCState, IsPersistent, Guid), Q4a = queue:in(m(MsgStatus #msg_status { msg = Msg }), Q4), RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION -- cgit v1.2.1