diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-04-01 15:08:42 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-04-01 15:08:42 +0100 |
commit | 074f16acbd597a51c941f716cdba9c20fca13153 (patch) | |
tree | ff1c9a2e6baf9896d7d7c1a9c5946b1f71e8f013 | |
parent | e44b23cbcacc4d531c1a61209cdca870d966e682 (diff) | |
download | rabbitmq-server-bug20546.tar.gz |
wait foreverbug20546
-rw-r--r-- | src/rabbit_alarm.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 30 | ||||
-rw-r--r-- | src/rabbit_guid.erl | 3 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_persister.erl | 8 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 4 | ||||
-rw-r--r-- | src/rabbit_router.erl | 3 |
7 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 875624ba..21999f16 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -78,7 +78,8 @@ stop() -> register(Pid, HighMemMFA) -> ok = gen_event:call(alarm_handler, ?MODULE, - {register, Pid, HighMemMFA}). + {register, Pid, HighMemMFA}, + infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c941f307..fdc99c38 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -213,10 +213,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:call(QPid, info). + gen_server2:call(QPid, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:call(QPid, {info, Items}) of + case gen_server2:call(QPid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -225,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server2:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}, infinity), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> gen_server2:cast(QPid, {deliver, Txn, Message}), @@ -254,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) -> gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> @@ -267,12 +266,11 @@ rollback_all(QPids, Txn) -> QPids). notify_down_all(QPids, ChPid) -> - Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, QPids). limit_all(QPids, ChPid, LimiterPid) -> @@ -282,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) -> QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, + infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 51c1665b..2be00503 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -82,7 +82,8 @@ guid() -> %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. G = case get(guid) of - undefined -> {{gen_server:call(?SERVER, serial), self()}, 0}; + undefined -> {{gen_server:call(?SERVER, serial, infinity), self()}, + 0}; {S, I} -> {S, I+1} end, put(guid, G), diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 532be26d..3f9b6ebb 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -90,7 +90,7 @@ can_send(undefined, _QPid) -> can_send(LimiterPid, QPid) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index 99ae3f38..f4fa4599 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -95,7 +95,7 @@ start_link() -> transaction(MessageList) -> ?LOGDEBUG("transaction ~p~n", [MessageList]), TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}). + gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). extend_transaction(TxnKey, MessageList) -> ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), @@ -107,17 +107,17 @@ dirty_work(MessageList) -> commit_transaction(TxnKey) -> ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}). + gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). rollback_transaction(TxnKey) -> ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). force_snapshot() -> - gen_server:call(?SERVER, force_snapshot). + gen_server:call(?SERVER, force_snapshot, infinity). serial() -> - gen_server:call(?SERVER, serial). + gen_server:call(?SERVER, serial, infinity). %%-------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9f642e35..ef8038e7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. info(Pid) -> - gen_server:call(Pid, info). + gen_server:call(Pid, info, infinity). info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}) of + case gen_server:call(Pid, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ff42ea04..0b06a063 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, fun ({Node, QPids}) -> try gen_server2:call( {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}) + {deliver, QPids, Mandatory, Immediate, Txn, Message}, + infinity) catch _Class:_Reason -> %% TODO: figure out what to log (and do!) here |