summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2013-01-21 16:06:08 +0000
committerTim Watson <tim@rabbitmq.com>2013-01-21 16:06:08 +0000
commitc1f58c7ae294c9986bbbe3027f597daab58e8c48 (patch)
treeebd3447cae41e410513fa3832dab5c6315d22b52
parent83f0ae9e8d0021279613eed59db8b18db0b148ee (diff)
parent3a79219d6ea1acf176a92110c10f5d223baa9fbd (diff)
downloadrabbitmq-server-c1f58c7ae294c9986bbbe3027f597daab58e8c48.tar.gz
merge bug25397 into default
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_networking.erl9
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl4
6 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 9a3c67f9..2b43c8ba 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -75,6 +75,10 @@
%% except those that have been fetched already and are pending acks.
-callback purge(state()) -> {purged_msg_count(), state()}.
+%% Remove all messages in the queue which have been fetched and are
+%% pending acks.
+-callback purge_acks(state()) -> state().
+
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean(), pid(),
@@ -226,7 +230,7 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {publish, 5},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
{publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b5f72cad..c704804e 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/5, publish_delivered/4,
+ purge/1, purge_acks/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
@@ -198,6 +198,8 @@ purge(State = #state { gm = GM,
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1 }}.
+purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
+
publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9f12b34e..867aa2ed 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -37,18 +37,10 @@
-include("rabbit.hrl").
-%%----------------------------------------------------------------------------
-
-include("gm_specs.hrl").
--ifdef(use_specs).
-%% Shut dialyzer up
--spec(promote_me/2 :: (_, _) -> no_return()).
--endif.
-
%%----------------------------------------------------------------------------
-
-define(CREATION_EVENT_KEYS,
[pid,
name,
@@ -79,6 +71,8 @@
depth_delta
}).
+%%----------------------------------------------------------------------------
+
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
@@ -469,6 +463,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
+-ifdef(use_specs).
+-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()).
+-endif.
promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index ee430fb4..080e0987 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -41,8 +41,6 @@
-define(FIRST_TEST_BIND_PORT, 10000).
--define(CONNECTION_TABLE, rabbit_connection).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -122,7 +120,6 @@
%%----------------------------------------------------------------------------
boot() ->
- ets:new(?CONNECTION_TABLE, [public, named_table]),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
@@ -300,15 +297,15 @@ start_client(Sock) ->
start_ssl_client(SslOpts, Sock) ->
start_client(Sock, ssl_transform_fun(SslOpts)).
-register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok.
+register_connection(Pid) -> pg_local:join(rabbit_connections, Pid).
-unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok.
+unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid).
connections() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
rabbit_networking, connections_local, []).
-connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)].
+connections_local() -> pg_local:get_members(rabbit_connections).
connection_info_keys() -> rabbit_reader:info_keys().
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 13454d31..7bd8d541 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2323,6 +2323,7 @@ test_variable_queue() ->
fun test_dropwhile_varying_ram_duration/1,
fun test_fetchwhile_varying_ram_duration/1,
fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
fun test_variable_queue_requeue/1,
fun test_variable_queue_fold/1]],
passed.
@@ -2418,6 +2419,21 @@ test_variable_queue_requeue(VQ0) ->
{empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
VQ3.
+test_variable_queue_purge(VQ0) ->
+ LenDepth = fun (VQ) ->
+ {rabbit_variable_queue:len(VQ),
+ rabbit_variable_queue:depth(VQ)}
+ end,
+ VQ1 = variable_queue_publish(false, 10, VQ0),
+ {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1),
+ {4, VQ3} = rabbit_variable_queue:purge(VQ2),
+ {0, 6} = LenDepth(VQ3),
+ {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3),
+ {2, 6} = LenDepth(VQ4),
+ VQ5 = rabbit_variable_queue:purge_acks(VQ4),
+ {2, 2} = LenDepth(VQ5),
+ VQ5.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c0552577..063eef0b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,7 +16,7 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4,
fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1,
@@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4,
ram_msg_count = 0,
persistent_count = PCount1 })}.
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,