From 488057258e8bd53a62348bd82ae0c70c268638ad Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 20 Jan 2013 13:20:53 +0000 Subject: cosmetic: move spec of internal function and make it more precise --- src/rabbit_mirror_queue_slave.erl | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9f12b34e..867aa2ed 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -37,18 +37,10 @@ -include("rabbit.hrl"). -%%---------------------------------------------------------------------------- - -include("gm_specs.hrl"). --ifdef(use_specs). -%% Shut dialyzer up --spec(promote_me/2 :: (_, _) -> no_return()). --endif. - %%---------------------------------------------------------------------------- - -define(CREATION_EVENT_KEYS, [pid, name, @@ -79,6 +71,8 @@ depth_delta }). +%%---------------------------------------------------------------------------- + start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> @@ -469,6 +463,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. +-ifdef(use_specs). +-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()). +-endif. promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, -- cgit v1.2.1 From 154510f6dc1ae3146b21b02d2c07a7a9d3ff8183 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 21 Jan 2013 13:06:59 +0000 Subject: USe pg_local rather than an ets table. --- src/rabbit_networking.erl | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index ee430fb4..080e0987 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -41,8 +41,6 @@ -define(FIRST_TEST_BIND_PORT, 10000). --define(CONNECTION_TABLE, rabbit_connection). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -122,7 +120,6 @@ %%---------------------------------------------------------------------------- boot() -> - ets:new(?CONNECTION_TABLE, [public, named_table]), ok = start(), ok = boot_tcp(), ok = boot_ssl(). @@ -300,15 +297,15 @@ start_client(Sock) -> start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). -register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok. +register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). -unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok. +unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). -connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)]. +connections_local() -> pg_local:get_members(rabbit_connections). connection_info_keys() -> rabbit_reader:info_keys(). -- cgit v1.2.1 From 62c4b3d9963ea78a5c36a3981958c8e6988f0756 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 21 Jan 2013 14:45:51 +0000 Subject: get th channel to flush the writer when the former is asked to terminate by the reader --- src/rabbit_channel.erl | 3 ++- src/rabbit_writer.erl | 24 +++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..2b9cffd4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -280,7 +280,8 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..059d3839 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. -- cgit v1.2.1 From ad58ded86094cd49a4333fdd5d79f49feb591d55 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 21 Jan 2013 14:58:13 +0000 Subject: Backed out changeset 0ca8cbef9720 accidentally committed on 'stable' instead of bug25360 branch --- src/rabbit_channel.erl | 3 +-- src/rabbit_writer.erl | 24 +++++++----------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2b9cffd4..b97af6d8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -280,8 +280,7 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:flush(WriterPid), +handle_cast(terminate, State) -> {stop, normal, State}; handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 059d3839..a7ea3d99 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,8 +21,7 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5, - flush/1]). + send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -70,7 +69,6 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -132,7 +130,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(internal_flush(State)) + ?MODULE:mainloop1(flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -140,18 +138,12 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = internal_flush( - internal_send_command_async(MethodRecord, State)), + State1 = flush(internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = internal_flush( - internal_send_command_async(MethodRecord, Content, State)), - gen_server:reply(From, ok), - State1; -handle_message({'$gen_call', From, flush}, State) -> - State1 = internal_flush(State), + State1 = flush(internal_send_command_async(MethodRecord, Content, State)), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -200,8 +192,6 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. -flush(W) -> call(W, flush). - %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -261,13 +251,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> internal_flush(State); + true -> flush(State); false -> State end. -internal_flush(State = #wstate{pending = []}) -> +flush(State = #wstate{pending = []}) -> State; -internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> +flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. -- cgit v1.2.1 From ac66b65c6359d75ff5adfc7b955e92286990933e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 21 Jan 2013 14:59:55 +0000 Subject: get the channel to flush the writer when the former is asked to terminate by the reader --- src/rabbit_channel.erl | 3 ++- src/rabbit_writer.erl | 24 +++++++++++++++++------- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b97af6d8..2b9cffd4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -280,7 +280,8 @@ handle_cast(ready_for_close, State = #ch{state = closing, ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State) -> +handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg}, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a7ea3d99..059d3839 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -21,7 +21,8 @@ -export([start/5, start_link/5, start/6, start_link/6]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, - send_command_and_notify/4, send_command_and_notify/5]). + send_command_and_notify/4, send_command_and_notify/5, + flush/1]). -export([internal_send_command/4, internal_send_command/6]). %% internal @@ -69,6 +70,7 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:protocol()) @@ -130,7 +132,7 @@ mainloop1(State) -> receive Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(flush(State)) + ?MODULE:mainloop1(internal_flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) -> handle_message({send_command, MethodRecord, Content}, State) -> internal_send_command_async(MethodRecord, Content, State); handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, State)), gen_server:reply(From, ok), State1; handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, State) -> - State1 = flush(internal_send_command_async(MethodRecord, Content, State)), + State1 = internal_flush( + internal_send_command_async(MethodRecord, Content, State)), + gen_server:reply(From, ok), + State1; +handle_message({'$gen_call', From, flush}, State) -> + State1 = internal_flush(State), gen_server:reply(From, ok), State1; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> @@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. +flush(W) -> call(W, flush). + %%--------------------------------------------------------------------------- call(Pid, Msg) -> @@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content, maybe_flush(State = #wstate{pending = Pending}) -> case iolist_size(Pending) >= ?FLUSH_THRESHOLD of - true -> flush(State); + true -> internal_flush(State); false -> State end. -flush(State = #wstate{pending = []}) -> +internal_flush(State = #wstate{pending = []}) -> State; -flush(State = #wstate{sock = Sock, pending = Pending}) -> +internal_flush(State = #wstate{sock = Sock, pending = Pending}) -> ok = port_cmd(Sock, lists:reverse(Pending)), State#wstate{pending = []}. -- cgit v1.2.1 From c408184ffd653dc7b52fe584b9e13f410229e142 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 22 Jan 2013 12:50:15 +0000 Subject: our test writer needs to do a bit more now --- src/rabbit_tests.erl | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index a68caadb..7a0ed1af 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1084,9 +1084,16 @@ test_policy_validation() -> rabbit_runtime_parameters_test:unregister_policy_validator(), passed. +writer() -> + receive + {'$gen_call', From, flush} -> gen_server:reply(From, ok), + writer(); + shutdown -> ok + end. + test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun () -> receive shutdown -> ok end end), + Writer = spawn(fun writer/0), {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), -- cgit v1.2.1 From a09d21e32b4147bab8e33a177a41fc68476e440a Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 22 Jan 2013 13:58:49 +0000 Subject: one test writer is quite enough. and make it work. --- src/rabbit_tests.erl | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7a0ed1af..1e02ff6b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1084,16 +1084,9 @@ test_policy_validation() -> rabbit_runtime_parameters_test:unregister_policy_validator(), passed. -writer() -> - receive - {'$gen_call', From, flush} -> gen_server:reply(From, ok), - writer(); - shutdown -> ok - end. - test_server_status() -> %% create a few things so there is some useful information to list - Writer = spawn(fun writer/0), + Writer = spawn(fun test_writer/0), {ok, Ch} = rabbit_channel:start_link( 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1, user(<<"user">>), <<"/">>, [], self(), @@ -1167,10 +1160,15 @@ test_server_status() -> passed. +test_writer() -> test_writer(none). + test_writer(Pid) -> receive - shutdown -> ok; - {send_command, Method} -> Pid ! Method, test_writer(Pid) + {'$gen_call', From, flush} -> gen_server:reply(From, ok), + test_writer(Pid); + {send_command, Method} -> Pid ! Method, + test_writer(Pid); + shutdown -> ok end. test_spawn() -> -- cgit v1.2.1