summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-06 18:20:11 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-06 18:20:11 +0100
commit89116a5512be7754476803a7d03bb692672c386d (patch)
tree532c71bfbd0763cb582a883810d77224008b3c1c
parentb4663371bb6609acbe35797c8fcf235172a6e115 (diff)
parent37f21f92299df57edf05c8c2291538cc69e07ec9 (diff)
downloadrabbitmq-server-89116a5512be7754476803a7d03bb692672c386d.tar.gz
merge bug23133 into default
-rw-r--r--include/rabbit_backing_queue_spec.hrl17
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_basic.erl26
-rw-r--r--src/vm_memory_monitor.erl5
4 files changed, 29 insertions, 27 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 005994f0..38c6f939 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -30,8 +30,9 @@
%%
-type(fetch_result() ::
- %% Message, IsDelivered, AckTag, Remaining_Len
- ('empty'|{rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
+ ('empty' |
+ %% Message, IsDelivered, AckTag, RemainingLen
+ {rabbit_types:basic_message(), boolean(), ack(), non_neg_integer()})).
-type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
@@ -39,19 +40,23 @@
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) -> state()).
+-spec(init/3 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery()) ->
+ state()).
-spec(terminate/1 :: (state()) -> state()).
-spec(delete_and_terminate/1 :: (state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
-spec(publish/2 :: (rabbit_types:basic_message(), state()) -> state()).
-spec(publish_delivered/3 ::
- (ack_required(), rabbit_types:basic_message(), state()) -> {ack(), state()}).
+ (ack_required(), rabbit_types:basic_message(), state()) ->
+ {ack(), state()}).
-spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}).
-spec(ack/2 :: ([ack()], state()) -> state()).
--spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> state()).
+-spec(tx_publish/3 :: (rabbit_types:txn(), rabbit_types:basic_message(),
+ state()) -> state()).
-spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
-spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) -> {[ack()], state()}).
+-spec(tx_commit/3 :: (rabbit_types:txn(), fun (() -> any()), state()) ->
+ {[ack()], state()}).
-spec(requeue/2 :: ([ack()], state()) -> state()).
-spec(len/1 :: (state()) -> non_neg_integer()).
-spec(is_empty/1 :: (state()) -> boolean()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2c53a8e3..61204deb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -251,8 +251,9 @@ stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{expiry_timer_ref = undefined}.
-%% We only wish to expire where there are no consumers *and* when
-%% basic.get hasn't been called for the configured period.
+%% We wish to expire only when there are no consumers *and* the expiry
+%% hasn't been refreshed (by queue.declare or basic.get) for the
+%% configured period.
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires}) ->
@@ -783,7 +784,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
handle_call(stat, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = ActiveConsumers}) ->
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State);
+ reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)},
+ ensure_expiry_timer(State));
handle_call(delete_exclusive, _From,
State = #q{ backing_queue_state = BQS,
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d62fc07c..38412982 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -55,28 +55,24 @@
rabbit_types:message()) -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> (rabbit_types:message() | rabbit_types:error(any()))).
+ properties_input(), binary()) ->
+ (rabbit_types:message() | rabbit_types:error(any()))).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary())
- -> publish_result()).
+ properties_input(), binary()) -> publish_result()).
-spec(publish/7 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), binary())
- -> publish_result()).
--spec(build_content/2 ::
- (rabbit_framing:amqp_property_record(), binary())
- -> rabbit_types:content()).
--spec(from_content/1 ::
- (rabbit_types:content())
- -> {rabbit_framing:amqp_property_record(), binary()}).
--spec(is_message_persistent/1 ::
- (rabbit_types:decoded_content())
- -> (boolean() | {'invalid', non_neg_integer()})).
+ properties_input(), binary()) -> publish_result()).
+-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
+ rabbit_types:content()).
+-spec(from_content/1 :: (rabbit_types:content()) ->
+ {rabbit_framing:amqp_property_record(), binary()}).
+-spec(is_message_persistent/1 :: (rabbit_types:decoded_content()) ->
+ (boolean() |
+ {'invalid', non_neg_integer()})).
-endif.
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 067fa9f2..9eb9d0a6 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -47,7 +47,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([update/0, get_total_memory/0,
+-export([update/0, get_total_memory/0, get_vm_limit/0,
get_check_interval/0, set_check_interval/1,
get_vm_memory_high_watermark/0, set_vm_memory_high_watermark/1,
get_memory_limit/0]).
@@ -76,7 +76,7 @@
-spec(update/0 :: () -> 'ok').
-spec(get_total_memory/0 :: () -> (non_neg_integer() | 'unknown')).
-spec(get_vm_limit/0 :: () -> non_neg_integer()).
--spec(get_memory_limit/0 :: () -> (non_neg_integer() | 'undefined')).
+-spec(get_memory_limit/0 :: () -> non_neg_integer()).
-spec(get_check_interval/0 :: () -> non_neg_integer()).
-spec(set_check_interval/1 :: (non_neg_integer()) -> 'ok').
-spec(get_vm_memory_high_watermark/0 :: () -> float()).
@@ -84,7 +84,6 @@
-endif.
-
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------