From 6570ed681706498d09cf4783ce24727ffe5a7f0d Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 25 Jan 2011 19:00:36 +0000 Subject: If we don't actually write the msg and the msg isn't in the current file then delete it if its pending write count is 0 --- src/rabbit_msg_store.erl | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e07..ac4cda99 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -750,12 +750,13 @@ handle_cast({write, CRef, Guid}, true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case should_mask_action(CRef, Guid, State) of - {true, _Location} -> + {true, Loc} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), noreply(State); {false, not_found} -> write_message(CRef, Guid, Msg, State); {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> + total_size = TotalSize } = Loc} -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State), @@ -765,15 +766,18 @@ handle_cast({write, CRef, Guid}, %% message, but as it is being GC'd currently, %% we'll have to write a new copy, which will then %% be younger, so ignore this write. + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), noreply(State); {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), State1 = client_confirm_if_on_disk(CRef, Guid, File, State), noreply(adjust_valid_total_size(File, TotalSize, State1)) end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> + {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), ok = index_update_ref_count(Guid, RefCount + 1, State), noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) end; @@ -1108,6 +1112,14 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). +maybe_remove_from_cache(_Guid, #msg_location { file = CurFile }, _Msg, + #msstate { current_file = CurFile }) -> + ok; +maybe_remove_from_cache(Guid, _Location, Msg, + #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + ok. + adjust_valid_total_size(File, Delta, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts }) -> -- cgit v1.2.1 From 89213a4aab4609fdfdedcb181ad5c6fe4eb8e842 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 26 Jan 2011 12:37:55 +0000 Subject: return State from write_message rather than it calling noreply This makes the API more consistent (cf read_message etc). Also, shuffle order of some write actions for consistency --- src/rabbit_msg_store.erl | 79 +++++++++++++++++++++++++----------------------- 1 file changed, 41 insertions(+), 38 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index ac4cda99..448ca47a 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,7 +738,8 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, @@ -749,38 +750,41 @@ handle_cast({write, CRef, Guid}, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, Loc} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize } = Loc} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case should_mask_action(CRef, Guid, State) of + {true, Loc} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State; + {false, not_found} -> + write_message(CRef, Guid, Msg, State); + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize } = Loc} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + write_message(CRef, Guid, Msg, State); + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client + %% death message, but as it is being GC'd + %% currently we'll have to write a new copy, + %% which will then be younger, so ignore this + %% write. + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + client_confirm_if_on_disk(CRef, Guid, File, State1) + end; + {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> + %% We already know about it, just update counter. Only + %% update field otherwise bad interaction with + %% concurrent GC + ok = index_update_ref_count(Guid, RefCount + 1, State), + ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + client_confirm_if_on_disk(CRef, Guid, File, State) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -947,11 +951,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> -- cgit v1.2.1 From 08478a808204cf9033cc486edd91eaab6336568b Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 26 Jan 2011 13:31:00 +0000 Subject: refactor 'write' handler for clarity --- src/rabbit_msg_store.erl | 72 +++++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 448ca47a..18227be1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -746,44 +746,19 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), noreply( - case should_mask_action(CRef, Guid, State) of - {true, Loc} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, Loc, State1} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), State; - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize } = Loc} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client - %% death message, but as it is being GC'd - %% currently we'll have to write a new copy, - %% which will then be younger, so ignore this - %% write. - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State; - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - State1 = adjust_valid_total_size(File, TotalSize, State), - client_confirm_if_on_disk(CRef, Guid, File, State1) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File } = Loc} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with - %% concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - ok = maybe_remove_from_cache(Guid, Loc, Msg, State), - client_confirm_if_on_disk(CRef, Guid, File, State) + {confirm, Loc, File, State1} -> + ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), + client_confirm_if_on_disk(CRef, Guid, File, State1) end); handle_cast({remove, CRef, Guids}, State) -> @@ -932,6 +907,35 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, Loc}, _Guid, State) -> + {ignore, Loc, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize } = Loc}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, Loc, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, Loc, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File } = Loc}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, Loc, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). -- cgit v1.2.1 From bef227e0c6049c1af04b5e6152e5c8981c9f4348 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 26 Jan 2011 13:42:44 +0000 Subject: simplify --- src/rabbit_msg_store.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 18227be1..df403fbf 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -756,7 +756,7 @@ handle_cast({write, CRef, Guid}, {ignore, Loc, State1} -> ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), State; - {confirm, Loc, File, State1} -> + {confirm, Loc = #msg_location { file = File }, State1} -> ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), client_confirm_if_on_disk(CRef, Guid, File, State1) end); @@ -927,14 +927,14 @@ write_action({Mask, #msg_location { ref_count = 0, file = File, {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), State1 = adjust_valid_total_size(File, TotalSize, State), - {confirm, Loc, File, State1} + {confirm, Loc, State1} end; -write_action({_Mask, #msg_location { ref_count = RefCount, file = File } = Loc}, +write_action({_Mask, #msg_location { ref_count = RefCount } = Loc}, Guid, State) -> ok = index_update_ref_count(Guid, RefCount + 1, State), %% We already know about it, just update counter. Only update %% field otherwise bad interaction with concurrent GC - {confirm, Loc, File, State}. + {confirm, Loc, State}. write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). -- cgit v1.2.1 From cd36137ba4692d92eaba9b1a5cedc6440c96af71 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Wed, 26 Jan 2011 14:31:49 +0000 Subject: inline two funs for compactness and clarity --- src/rabbit_msg_store.erl | 54 +++++++++++++++++++++--------------------------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index df403fbf..e9c356e1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -753,12 +753,20 @@ handle_cast({write, CRef, Guid}, case write_action(should_mask_action(CRef, Guid, State), Guid, State) of {write, State1} -> write_message(CRef, Guid, Msg, State1); - {ignore, Loc, State1} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), - State; - {confirm, Loc = #msg_location { file = File }, State1} -> - ok = maybe_remove_from_cache(Guid, Loc, Msg, State1), - client_confirm_if_on_disk(CRef, Guid, File, State1) + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, Guid, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State1) end); handle_cast({remove, CRef, Guids}, State) -> @@ -907,12 +915,14 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. -write_action({true, Loc}, _Guid, State) -> - {ignore, Loc, State}; +write_action({true, not_found}, _Guid, State) -> + {ignore, undefined, State}; +write_action({true, #msg_location { file = File }}, _Guid, State) -> + {ignore, File, State}; write_action({false, not_found}, _Guid, State) -> {write, State}; write_action({Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize } = Loc}, + total_size = TotalSize }}, Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> @@ -923,18 +933,18 @@ write_action({Mask, #msg_location { ref_count = 0, file = File, %% message, but as it is being GC'd currently we'll have %% to write a new copy, which will then be younger, so %% ignore this write. - {ignore, Loc, State}; + {ignore, File, State}; {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), State1 = adjust_valid_total_size(File, TotalSize, State), - {confirm, Loc, State1} + {confirm, File, State1} end; -write_action({_Mask, #msg_location { ref_count = RefCount } = Loc}, +write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, Guid, State) -> ok = index_update_ref_count(Guid, RefCount + 1, State), %% We already know about it, just update counter. Only update %% field otherwise bad interaction with concurrent GC - {confirm, Loc, State}. + {confirm, File, State}. write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). @@ -1119,14 +1129,6 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). -maybe_remove_from_cache(_Guid, #msg_location { file = CurFile }, _Msg, - #msstate { current_file = CurFile }) -> - ok; -maybe_remove_from_cache(Guid, _Location, Msg, - #msstate { cur_file_cache_ets = CurFileCacheEts }) -> - true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), - ok. - adjust_valid_total_size(File, Delta, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts }) -> @@ -1153,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) -> gb_sets:singleton(Guid), CTG) end, CRef, State). -client_confirm_if_on_disk(CRef, Guid, CurFile, - State = #msstate { current_file = CurFile }) -> - record_pending_confirm(CRef, Guid, State); -client_confirm_if_on_disk(CRef, Guid, _File, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG - end, CRef, State). - client_confirm(CRef, Guids, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> -- cgit v1.2.1 From 1912e5faf809ab09a6bba861366693fd807eaafa Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Wed, 26 Jan 2011 15:44:05 +0000 Subject: If there is an mnesia entry for the queue and the qpid is dead, then remove the mnesia entry, but still return the existing queue --- src/rabbit_amqqueue.erl | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ad9e3ce6..95ac1301 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -215,8 +215,12 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> [_] -> %% Q exists on stopped node rabbit_misc:const(not_found) end; - [ExistingQ] -> - rabbit_misc:const(ExistingQ) + [ExistingQ = #amqqueue{pid = QPid}] -> + case is_process_alive(QPid) of + true -> rabbit_misc:const(ExistingQ); + false -> TailFun = internal_delete(QueueName), + fun (Tx) -> TailFun(Tx), ExistingQ end + end end end). @@ -432,17 +436,15 @@ internal_delete1(QueueName) -> rabbit_binding:remove_for_destination(QueueName). internal_delete(QueueName) -> - rabbit_misc:execute_mnesia_transaction( + rabbit_misc:execute_mnesia_tx_with_tail( fun () -> case mnesia:wread({rabbit_queue, QueueName}) of - [] -> {error, not_found}; - [_] -> internal_delete1(QueueName) + [] -> rabbit_misc:const({error, not_found}); + [_] -> Deletions = internal_delete1(QueueName), + fun (Tx) -> ok = rabbit_binding:process_deletions( + Deletions, Tx) + end end - end, - fun ({error, _} = Err, _Tx) -> - Err; - (Deletions, Tx) -> - ok = rabbit_binding:process_deletions(Deletions, Tx) end). maybe_run_queue_via_backing_queue(QPid, Fun) -> -- cgit v1.2.1 From ba418f0f7c06821af7ce7d3719f35895830c1acd Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 27 Jan 2011 12:21:44 +0000 Subject: Correct specs --- src/rabbit_amqqueue.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 95ac1301..a6da551d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -137,7 +137,9 @@ -> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())). -spec(internal_delete/1 :: (name()) -> rabbit_types:ok_or_error('not_found') | - rabbit_types:connection_exit()). + rabbit_types:connection_exit() | + fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | + rabbit_types:connection_exit())). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: -- cgit v1.2.1 From 4c27739ee6b528d503673502d61da3a23ad510f0 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 27 Jan 2011 15:13:44 +0000 Subject: Add --disabled-login --- packaging/debs/Debian/debian/postinst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 05fb179c..b1cf809f 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -26,7 +26,8 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ - --no-create-home --gecos "RabbitMQ messaging server" rabbitmq + --no-create-home --gecos "RabbitMQ messaging server" \ + --disabled-login rabbitmq fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq -- cgit v1.2.1 From 35f98339bedae520043869bce8d9f4ef112b4bec Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 27 Jan 2011 15:15:15 +0000 Subject: tabs to spaces. --- packaging/debs/Debian/debian/postinst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index b1cf809f..134f16ee 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -26,7 +26,7 @@ fi # create rabbitmq user if ! getent passwd rabbitmq >/dev/null; then adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \ - --no-create-home --gecos "RabbitMQ messaging server" \ + --no-create-home --gecos "RabbitMQ messaging server" \ --disabled-login rabbitmq fi -- cgit v1.2.1 From 4cc865d821020aedde20bdd214d73ef6e28d72f1 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 27 Jan 2011 16:39:27 +0000 Subject: Stop the channel crashing if we requeue to a dead queue --- src/rabbit_channel.erl | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 91559ea6..cc3cfbf4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -736,14 +736,18 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ}) -> + OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes %% that messages will be requeued in their original %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. - rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), self()) + rabbit_misc:with_exit_handler( + OkFun, fun () -> + rabbit_amqqueue:requeue( + QPid, lists:reverse(MsgIds), self()) + end) end, ok, UAMQ), %% No answer required - basic.recover is the newer, synchronous %% variant of this method -- cgit v1.2.1