diff options
author | Tim Watson <tim@rabbitmq.com> | 2013-01-21 16:06:08 +0000 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2013-01-21 16:06:08 +0000 |
commit | c1f58c7ae294c9986bbbe3027f597daab58e8c48 (patch) | |
tree | ebd3447cae41e410513fa3832dab5c6315d22b52 | |
parent | 83f0ae9e8d0021279613eed59db8b18db0b148ee (diff) | |
parent | 3a79219d6ea1acf176a92110c10f5d223baa9fbd (diff) | |
download | rabbitmq-server-c1f58c7ae294c9986bbbe3027f597daab58e8c48.tar.gz |
merge bug25397 into default
-rw-r--r-- | src/rabbit_backing_queue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 9 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
6 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 9a3c67f9..2b43c8ba 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -75,6 +75,10 @@ %% except those that have been fetched already and are pending acks. -callback purge(state()) -> {purged_msg_count(), state()}. +%% Remove all messages in the queue which have been fetched and are +%% pending acks. +-callback purge_acks(state()) -> state(). + %% Publish a message. -callback publish(rabbit_types:basic_message(), rabbit_types:message_properties(), boolean(), pid(), @@ -226,7 +230,7 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {publish, 5}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b5f72cad..c704804e 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,7 +17,7 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/5, publish_delivered/4, + purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, @@ -198,6 +198,8 @@ purge(State = #state { gm = GM, {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1 }}. +purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). + publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, State = #state { gm = GM, seen_status = SS, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9f12b34e..867aa2ed 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -37,18 +37,10 @@ -include("rabbit.hrl"). -%%---------------------------------------------------------------------------- - -include("gm_specs.hrl"). --ifdef(use_specs). -%% Shut dialyzer up --spec(promote_me/2 :: (_, _) -> no_return()). --endif. - %%---------------------------------------------------------------------------- - -define(CREATION_EVENT_KEYS, [pid, name, @@ -79,6 +71,8 @@ depth_delta }). +%%---------------------------------------------------------------------------- + start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). set_maximum_since_use(QPid, Age) -> @@ -469,6 +463,9 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> handle_process_result({ok, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. +-ifdef(use_specs). +-spec(promote_me/2 :: ({pid(), term()}, #state{}) -> no_return()). +-endif. promote_me(From, #state { q = Q = #amqqueue { name = QName }, gm = GM, backing_queue = BQ, diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index ee430fb4..080e0987 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -41,8 +41,6 @@ -define(FIRST_TEST_BIND_PORT, 10000). --define(CONNECTION_TABLE, rabbit_connection). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -122,7 +120,6 @@ %%---------------------------------------------------------------------------- boot() -> - ets:new(?CONNECTION_TABLE, [public, named_table]), ok = start(), ok = boot_tcp(), ok = boot_ssl(). @@ -300,15 +297,15 @@ start_client(Sock) -> start_ssl_client(SslOpts, Sock) -> start_client(Sock, ssl_transform_fun(SslOpts)). -register_connection(Pid) -> ets:insert(?CONNECTION_TABLE, {Pid}), ok. +register_connection(Pid) -> pg_local:join(rabbit_connections, Pid). -unregister_connection(Pid) -> ets:delete(?CONNECTION_TABLE, Pid), ok. +unregister_connection(Pid) -> pg_local:leave(rabbit_connections, Pid). connections() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running), rabbit_networking, connections_local, []). -connections_local() -> [P || {P} <- ets:tab2list(?CONNECTION_TABLE)]. +connections_local() -> pg_local:get_members(rabbit_connections). connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 13454d31..7bd8d541 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2323,6 +2323,7 @@ test_variable_queue() -> fun test_dropwhile_varying_ram_duration/1, fun test_fetchwhile_varying_ram_duration/1, fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_purge/1, fun test_variable_queue_requeue/1, fun test_variable_queue_fold/1]], passed. @@ -2418,6 +2419,21 @@ test_variable_queue_requeue(VQ0) -> {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), VQ3. +test_variable_queue_purge(VQ0) -> + LenDepth = fun (VQ) -> + {rabbit_variable_queue:len(VQ), + rabbit_variable_queue:depth(VQ)} + end, + VQ1 = variable_queue_publish(false, 10, VQ0), + {VQ2, Acks} = variable_queue_fetch(6, false, false, 10, VQ1), + {4, VQ3} = rabbit_variable_queue:purge(VQ2), + {0, 6} = LenDepth(VQ3), + {_, VQ4} = rabbit_variable_queue:requeue(lists:sublist(Acks, 2), VQ3), + {2, 6} = LenDepth(VQ4), + VQ5 = rabbit_variable_queue:purge_acks(VQ4), + {2, 2} = LenDepth(VQ5), + VQ5. + test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages Len = 1024, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c0552577..063eef0b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,7 +16,7 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, purge/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, purge_acks/1, publish/5, publish_delivered/4, discard/3, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, @@ -519,6 +519,8 @@ purge(State = #vqstate { q4 = Q4, ram_msg_count = 0, persistent_count = PCount1 })}. +purge_acks(State) -> a(purge_pending_ack(false, State)). + publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, |