From 4af7804b122b33a034c6d390a4fbb5fd4c73b9db Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 13 Mar 2014 19:14:21 +0000 Subject: invoke credit_flow:peer_down on, err, DOWN. *only* Thus preventing potential leaks and deadlocks. Also, clean up 'clients' in clear_client, and thus client_terminate, which was previously missing and thus the source of another potential leak. --- src/rabbit_msg_store.erl | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9a4439a7..627335a5 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -641,9 +641,11 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - dying_clients = DyingClients }) -> + clients = Clients, + dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), - dying_clients = sets:del_element(CRef, DyingClients) }. + clients = dict:erase(CRef, Clients), + dying_clients = sets:del_element(CRef, DyingClients) }. %%---------------------------------------------------------------------------- @@ -781,6 +783,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), + erlang:monitor(process, CPid), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -802,12 +805,8 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, - State = #msstate { clients = Clients }) -> - {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:peer_down(CPid), - State1 = State #msstate { clients = dict:erase(CRef, Clients) }, - noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); +handle_cast({client_delete, CRef}, State) -> + noreply(remove_message(CRef, CRef, clear_client(CRef, State))); handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, @@ -888,6 +887,10 @@ handle_info(sync, State) -> handle_info(timeout, State) -> noreply(internal_sync(State)); +handle_info({'DOWN', _MRef, Pid, _Reason}, State) -> + credit_flow:peer_down(Pid), + noreply(State); + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -- cgit v1.2.1 From f7a717c1df0e5d811305e6b054811f8d514a5272 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 13 Mar 2014 19:25:59 +0000 Subject: turns out that cleaning up 'clients' in client_terminate is bad since it breaks clean shutdown/recovery, which relies on the CRefs in 'clients'. --- src/rabbit_msg_store.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 627335a5..2322f0f9 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -641,11 +641,9 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, - clients = Clients, - dying_clients = DyingClients }) -> + dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), - clients = dict:erase(CRef, Clients), - dying_clients = sets:del_element(CRef, DyingClients) }. + dying_clients = sets:del_element(CRef, DyingClients) }. %%---------------------------------------------------------------------------- @@ -805,8 +803,10 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, State) -> - noreply(remove_message(CRef, CRef, clear_client(CRef, State))); +handle_cast({client_delete, CRef}, + State = #msstate { clients = Clients }) -> + State1 = State #msstate { clients = dict:erase(CRef, Clients) }, + noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, -- cgit v1.2.1 From 6a65d9b62a55a896db63706aa015c5523a7182bb Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 13 Mar 2014 19:32:13 +0000 Subject: correctness++ --- src/rabbit_msg_store.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2322f0f9..1562050c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -887,7 +887,7 @@ handle_info(sync, State) -> handle_info(timeout, State) -> noreply(internal_sync(State)); -handle_info({'DOWN', _MRef, Pid, _Reason}, State) -> +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> credit_flow:peer_down(Pid), noreply(State); -- cgit v1.2.1