summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-16 15:51:26 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-16 15:51:26 +0000
commitefd407517c1cc77495d187ec1e88fc7d89c6e06b (patch)
tree3b5060256d08831a2dca99067a99c89ab6f16f37 /src/rabbit_mirror_queue_master.erl
parent6a380541b983575c44b5e5c022855d94c92ebda8 (diff)
downloadrabbitmq-server-efd407517c1cc77495d187ec1e88fc7d89c6e06b.tar.gz
Avoid "Absurdly large distribution output data buffer" death.
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d9cef642..57bf5b33 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,7 +212,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
@@ -222,7 +222,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}),
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
@@ -479,3 +480,9 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid,
CPid, [ChPid]),
State #state { known_senders = sets:add_element(ChPid, KS) }
end.
+
+msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
+ msg_size(PFR, 0).
+
+msg_size([], Size) -> Size;
+msg_size([H|T], Size) -> msg_size(T, Size + size(H)).