diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-04-23 17:57:11 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-04-23 17:57:11 +0100 |
commit | afe952206ba04359c334337321ada308fc7280a8 (patch) | |
tree | 75d91cc346e96de4f962c70f07e96d2cb30a0d86 | |
parent | 80c6beed2eb9510a8ec3bbf2e515f27f35d23089 (diff) | |
parent | af51afc1653f566dd425ad73bf04f97a73837afd (diff) | |
download | rabbitmq-server-afe952206ba04359c334337321ada308fc7280a8.tar.gz |
Merged bug24885 into default
-rw-r--r-- | packaging/common/rabbitmq-server.init | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rabbitmq-server.default | 9 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/rules | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 41 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 2 |
7 files changed, 83 insertions, 54 deletions
diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index c942f8e3..40238c8e 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -35,6 +35,8 @@ test -x $CONTROL || exit 0 RETVAL=0 set -e +[ -f /etc/default/${NAME} ] && . /etc/default/${NAME} + ensure_pid_dir () { PID_DIR=`dirname ${PID_FILE}` if [ ! -d ${PID_DIR} ] ; then diff --git a/packaging/debs/Debian/debian/rabbitmq-server.default b/packaging/debs/Debian/debian/rabbitmq-server.default new file mode 100644 index 00000000..bde5e308 --- /dev/null +++ b/packaging/debs/Debian/debian/rabbitmq-server.default @@ -0,0 +1,9 @@ +# This file is sourced by /etc/init.d/rabbitmq-server. Its primary +# reason for existing is to allow adjustment of system limits for the +# rabbitmq-server process. +# +# Maximum number of open file handles. This will need to be increased +# to handle many simultaneous connections. Refer to the system +# documentation for ulimit (in man bash) for more information. +# +#ulimit -n 1024 diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 108b1ed5..ecb778df 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -19,3 +19,4 @@ install/rabbitmq-server:: done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server + install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0cf7de40..5701efeb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -723,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -730,31 +738,24 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_publish(Msg, Reason, - State = #q{publish_seqno = MsgSeqNo, - dlx = DLX}) -> - Delivery = #delivery{message = #basic_message{exchange_name = XName}} = - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo), - {ok, X} = rabbit_exchange:lookup(XName), - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids. - -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), @@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, - Queues) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}} end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 8ad59016..17d848da 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -63,7 +63,7 @@ -spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). --spec(map_headers/2 :: (rabbit_types:content(), fun((headers()) -> headers())) +-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content()) -> rabbit_types:content()). -spec(header_routes/1 :: diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 846890a1..22c6a223 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,9 +36,9 @@ conn_name, limiter, tx_status, next_tag, unacked_message_q, uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, virtual_host, most_recently_declared_queue, queue_monitors, - consumer_mapping, blocking, queue_consumers, queue_collector_pid, - stats_timer, confirm_enabled, publish_seqno, unconfirmed, - confirmed, capabilities, trace_state}). + consumer_mapping, blocking, queue_consumers, delivering_queues, + queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, + unconfirmed, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), + delivering_queues = sets:new(), queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, @@ -331,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + State4 = handle_delivering_queue_down(QPid, State3), credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = pmon:erase( - QPid, State3#ch.queue_monitors)}); + QPid, State4#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -657,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}} -> @@ -669,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, record_sent(none, not(NoAck), Msg, State)}; + State1 = monitor_delivering_queue(NoAck, QPid, State), + {noreply, record_sent(none, not(NoAck), Msg, State1)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -707,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ActualConsumerTag})), Q} end) of - {ok, Q} -> - State1 = State#ch{consumer_mapping = - dict:store(ActualConsumerTag, Q, - ConsumerMapping)}, + {ok, Q = #amqqueue{pid = QPid}} -> + CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, State#ch{consumer_mapping = CM1}), {noreply, case NoWait of true -> consumer_monitor(ActualConsumerTag, State1); @@ -1108,6 +1111,13 @@ consumer_monitor(ConsumerTag, State end. +monitor_delivering_queue(true, _QPid, State) -> + State; +monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons, + delivering_queues = DQ}) -> + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + delivering_queues = sets:add_element(QPid, DQ)}. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> case rabbit_misc:is_abnormal_termination(Reason) of true -> {MXs, UC1} = dtree:take_all(QPid, UC), @@ -1134,6 +1144,9 @@ handle_consuming_queue_down(QPid, State#ch{consumer_mapping = ConsumerMapping1, queue_consumers = dict:erase(QPid, QCons)}. +handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QPid, DQ)}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, @@ -1269,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), notify_queues(State = #ch{state = closing}) -> {ok, State}; -notify_queues(State = #ch{consumer_mapping = Consumers}) -> - {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()), - State#ch{state = closing}}. +notify_queues(State = #ch{consumer_mapping = Consumers, + delivering_queues = DQ }) -> + QPids = sets:to_list( + sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. fold_per_queue(_F, Acc, []) -> Acc; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2d155d14..17e2ffb4 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -356,7 +356,7 @@ handle_cast(request_length, State = #state { length_fun = LengthFun }) -> handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }). -handle_info(send_gm_heartbeat, State = #state{gm = GM}) -> +handle_info(send_gm_heartbeat, State = #state { gm = GM }) -> gm:broadcast(GM, heartbeat), ensure_gm_heartbeat(), noreply(State); |