summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-04-01 15:08:42 +0100
committerMatthias Radestock <matthias@lshift.net>2009-04-01 15:08:42 +0100
commit074f16acbd597a51c941f716cdba9c20fca13153 (patch)
treeff1c9a2e6baf9896d7d7c1a9c5946b1f71e8f013
parente44b23cbcacc4d531c1a61209cdca870d966e682 (diff)
downloadrabbitmq-server-bug20546.tar.gz
wait foreverbug20546
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl30
-rw-r--r--src/rabbit_guid.erl3
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_persister.erl8
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/rabbit_router.erl3
7 files changed, 28 insertions, 25 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 875624ba..21999f16 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -78,7 +78,8 @@ stop() ->
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA}).
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c941f307..fdc99c38 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -213,10 +213,10 @@ list(VHostPath) ->
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:call(QPid, info).
+ gen_server2:call(QPid, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:call(QPid, {info, Items}) of
+ case gen_server2:call(QPid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -225,20 +225,20 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server2:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
gen_server2:cast(QPid, {deliver, Txn, Message}),
@@ -254,10 +254,9 @@ ack(QPid, Txn, MsgIds, ChPid) ->
gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
@@ -267,12 +266,11 @@ rollback_all(QPids, Txn) ->
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
@@ -282,18 +280,20 @@ limit_all(QPids, ChPid, LimiterPid) ->
QPids).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 51c1665b..2be00503 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -82,7 +82,8 @@ guid() ->
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
+ 0};
{S, I} -> {S, I+1}
end,
put(guid, G),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 532be26d..3f9b6ebb 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -90,7 +90,7 @@ can_send(undefined, _QPid) ->
can_send(LimiterPid, QPid) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 99ae3f38..f4fa4599 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -95,7 +95,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
extend_transaction(TxnKey, MessageList) ->
?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
@@ -107,17 +107,17 @@ dirty_work(MessageList) ->
commit_transaction(TxnKey) ->
?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}).
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
rollback_transaction(TxnKey) ->
?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot).
+ gen_server:call(?SERVER, force_snapshot, infinity).
serial() ->
- gen_server:call(?SERVER, serial).
+ gen_server:call(?SERVER, serial, infinity).
%%--------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9f642e35..ef8038e7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ff42ea04..0b06a063 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -112,7 +112,8 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
fun ({Node, QPids}) ->
try gen_server2:call(
{?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message})
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here