summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-01-22 15:51:33 +0000
committerTim Watson <tim@rabbitmq.com>2013-01-22 15:51:33 +0000
commitd6ba4aa042bcef61e3dd3b15322ec00b5bc328c4 (patch)
tree93644c56909036002bfb044d032fd34bb4ade271
parentf27c502034c9e5218e280c4a39da88562b466f51 (diff)
parentfe6889ea21da979ce76f193d996361f670d7380a (diff)
downloadrabbitmq-server-d6ba4aa042bcef61e3dd3b15322ec00b5bc328c4.tar.gz
merge bug25394 into default
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_networking.erl9
-rw-r--r--src/rabbit_tests.erl11
-rw-r--r--src/rabbit_writer.erl24
5 files changed, 35 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2b89be8f..614e307c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -284,7 +284,8 @@ handle_cast(ready_for_close, State = #ch{state = closing,
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State) ->
+handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index cd2a8042..27b0326d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -471,6 +465,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index ee430fb4..080e0987 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -41,8 +41,6 @@
-define(FIRST_TEST_BIND_PORT, 10000).
--define(CONNECTION_TABLE, rabbit_connection).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -122,7 +120,6 @@
%%----------------------------------------------------------------------------
boot() ->
- ets:new(?CONNECTION_TABLE, [public, named_table]),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
@@ -300,15 +297,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
-register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok.
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
-unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok.
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)].
+connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c47f2772..b0ff5af9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1086,7 +1086,7 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun () -> receive shutdown -> ok end end),
+ Writer = spawn(fun test_writer/0),
{ok, Ch} = rabbit_channel:start_link(
1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
user(<<"user">>), <<"/">>, [], self(),
@@ -1161,10 +1161,15 @@ test_server_status() ->
passed.
+test_writer() -> test_writer(none).
+
test_writer(Pid) ->
receive
- shutdown -> ok;
- {send_command, Method} -> Pid ! Method, test_writer(Pid)
+ {'$gen_call', From, flush} -> gen_server:reply(From, ok),
+ test_writer(Pid);
+ {send_command, Method} -> Pid ! Method,
+ test_writer(Pid);
+ shutdown -> ok
end.
test_spawn() ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index a7ea3d99..059d3839 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -21,7 +21,8 @@
-export([start/5, start_link/5, start/6, start_link/6]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
- send_command_and_notify/4, send_command_and_notify/5]).
+ send_command_and_notify/4, send_command_and_notify/5,
+ flush/1]).
-export([internal_send_command/4, internal_send_command/6]).
%% internal
@@ -69,6 +70,7 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
+-spec(flush/1 :: (pid()) -> 'ok').
-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:protocol())
@@ -130,7 +132,7 @@ mainloop1(State) ->
receive
Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(flush(State))
+ ?MODULE:mainloop1(internal_flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -138,12 +140,18 @@ handle_message({send_command, MethodRecord}, State) ->
handle_message({send_command, MethodRecord, Content}, State) ->
internal_send_command_async(MethodRecord, Content, State);
handle_message({'$gen_call', From, {send_command_sync, MethodRecord}}, State) ->
- State1 = flush(internal_send_command_async(MethodRecord, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, State)),
gen_server:reply(From, ok),
State1;
handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
State) ->
- State1 = flush(internal_send_command_async(MethodRecord, Content, State)),
+ State1 = internal_flush(
+ internal_send_command_async(MethodRecord, Content, State)),
+ gen_server:reply(From, ok),
+ State1;
+handle_message({'$gen_call', From, flush}, State) ->
+ State1 = internal_flush(State),
gen_server:reply(From, ok),
State1;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
@@ -192,6 +200,8 @@ send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
+flush(W) -> call(W, flush).
+
%%---------------------------------------------------------------------------
call(Pid, Msg) ->
@@ -251,13 +261,13 @@ internal_send_command_async(MethodRecord, Content,
maybe_flush(State = #wstate{pending = Pending}) ->
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
- true -> flush(State);
+ true -> internal_flush(State);
false -> State
end.
-flush(State = #wstate{pending = []}) ->
+internal_flush(State = #wstate{pending = []}) ->
State;
-flush(State = #wstate{sock = Sock, pending = Pending}) ->
+internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.