diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-16 15:51:26 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-16 15:51:26 +0000 |
commit | efd407517c1cc77495d187ec1e88fc7d89c6e06b (patch) | |
tree | 3b5060256d08831a2dca99067a99c89ab6f16f37 /src/rabbit_mirror_queue_master.erl | |
parent | 6a380541b983575c44b5e5c022855d94c92ebda8 (diff) | |
download | rabbitmq-server-efd407517c1cc77495d187ec1e88fc7d89c6e06b.tar.gz |
Avoid "Absurdly large distribution output data buffer" death.
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index d9cef642..57bf5b33 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,7 +212,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, msg_size(Msg)), BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). @@ -222,7 +222,8 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}), + ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + msg_size(Msg)), {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. @@ -479,3 +480,9 @@ ensure_monitoring(ChPid, State = #state { coordinator = CPid, CPid, [ChPid]), State #state { known_senders = sets:add_element(ChPid, KS) } end. + +msg_size(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> + msg_size(PFR, 0). + +msg_size([], Size) -> Size; +msg_size([H|T], Size) -> msg_size(T, Size + size(H)). |