diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-16 12:58:11 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-16 12:58:11 +0000 |
commit | a6586a1c333ce2499a787c07ec7cb2c8a2cfc180 (patch) | |
tree | b6b6c01a37769196317c649c4dd855c1b979bff2 | |
parent | d38fe2887d29b677a15eb6bdad2ea55ce76c5405 (diff) | |
download | rabbitmq-server-a6586a1c333ce2499a787c07ec7cb2c8a2cfc180.tar.gz |
Transplant reader_pid vs connection_pid work from bug23350.
-rw-r--r-- | src/rabbit_channel.erl | 39 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 15 | ||||
-rw-r--r-- | src/rabbit_direct.erl | 14 |
3 files changed, 35 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index da103284..b27f6886 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/9, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([emit_stats/1, ready_for_close/1]). @@ -29,9 +29,9 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, - start_limiter_fun, transaction_id, tx_participants, next_tag, - uncommitted_ack_q, unacked_message_q, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, connection_pid, + limiter_pid, start_limiter_fun, transaction_id, tx_participants, + next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, @@ -67,8 +67,8 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/9 :: - (channel_number(), pid(), pid(), rabbit_types:protocol(), +-spec(start_link/10 :: + (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). @@ -96,11 +96,11 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, - CollectorPid, StartLimiterFun) -> +start_link(Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun) -> gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, - Capabilities, CollectorPid, StartLimiterFun], []). + ?MODULE, [Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, + VHost, Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -154,8 +154,8 @@ ready_for_close(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, - CollectorPid, StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), @@ -164,6 +164,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, + connection_pid = ConnectionPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, transaction_id = none, @@ -1410,13 +1411,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _) -> self(); -i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; -i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; -i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(pid, _) -> self(); +i(connection, #ch{connection_pid = Connection}) -> Connection; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{user = User}) -> User#user.username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 8175ad80..7eec0818 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -58,21 +58,22 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, Protocol, User, VHost, - Capabilities, Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, ReaderPid, Protocol, + User, VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, - Capabilities, Collector}) -> +start_link({direct, Channel, ClientChannelPid, ConnectionPid, Protocol, User, + VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, Protocol, - User, VHost, Capabilities, Collector, - start_limiter_fun(SupPid)]}, + [Channel, ClientChannelPid, ClientChannelPid, + ConnectionPid, Protocol, User, VHost, Capabilities, + Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index a2693c69..568cbea3 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/7]). +-export([boot/0, connect/4, start_channel/8]). -include("rabbit.hrl"). @@ -28,8 +28,8 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/7 :: - (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), +-spec(start_channel/8 :: + (rabbit_channel:channel_number(), pid(), pid(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> {'ok', pid()}). @@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, - Collector) -> +start_channel(Number, ClientChannelPid, ConnectionPid, Protocol, User, VHost, + Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, Protocol, User, VHost, - Capabilities, Collector}]), + [{direct, Number, ClientChannelPid, ConnectionPid, Protocol, User, + VHost, Capabilities, Collector}]), {ok, ChannelPid}. |