summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-16 12:58:11 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-16 12:58:11 +0000
commita6586a1c333ce2499a787c07ec7cb2c8a2cfc180 (patch)
treeb6b6c01a37769196317c649c4dd855c1b979bff2
parentd38fe2887d29b677a15eb6bdad2ea55ce76c5405 (diff)
downloadrabbitmq-server-a6586a1c333ce2499a787c07ec7cb2c8a2cfc180.tar.gz
Transplant reader_pid vs connection_pid work from bug23350.
-rw-r--r--src/rabbit_channel.erl39
-rw-r--r--src/rabbit_channel_sup.erl15
-rw-r--r--src/rabbit_direct.erl14
3 files changed, 35 insertions, 33 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da103284..b27f6886 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/9, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([emit_stats/1, ready_for_close/1]).
@@ -29,9 +29,9 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2]).
--record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid,
- start_limiter_fun, transaction_id, tx_participants, next_tag,
- uncommitted_ack_q, unacked_message_q,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, connection_pid,
+ limiter_pid, start_limiter_fun, transaction_id, tx_participants,
+ next_tag, uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -67,8 +67,8 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/9 ::
- (channel_number(), pid(), pid(), rabbit_types:protocol(),
+-spec(start_link/10 ::
+ (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
@@ -96,11 +96,11 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
- CollectorPid, StartLimiterFun) ->
+start_link(Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun) ->
gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun], []).
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User,
+ VHost, Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -154,8 +154,8 @@ ready_for_close(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
- CollectorPid, StartLimiterFun]) ->
+init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
@@ -164,6 +164,7 @@ init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
+ connection_pid = ConnectionPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
transaction_id = none,
@@ -1410,13 +1411,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _) -> self();
-i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
-i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(pid, _) -> self();
+i(connection, #ch{connection_pid = Connection}) -> Connection;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{user = User}) -> User#user.username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 8175ad80..7eec0818 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -58,21 +58,22 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, WriterPid, Protocol, User, VHost,
- Capabilities, Collector, start_limiter_fun(SupPid)]},
+ [Channel, ReaderPid, WriterPid, ReaderPid, Protocol,
+ User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost,
- Capabilities, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, ConnectionPid, Protocol, User,
+ VHost, Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid, Protocol,
- User, VHost, Capabilities, Collector,
- start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid,
+ ConnectionPid, Protocol, User, VHost, Capabilities,
+ Collector, start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index a2693c69..568cbea3 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, connect/4, start_channel/7]).
+-export([boot/0, connect/4, start_channel/8]).
-include("rabbit.hrl").
@@ -28,8 +28,8 @@
-spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
--spec(start_channel/7 ::
- (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(),
+-spec(start_channel/8 ::
+ (rabbit_channel:channel_number(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
pid()) -> {'ok', pid()}).
@@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities,
- Collector) ->
+start_channel(Number, ClientChannelPid, ConnectionPid, Protocol, User, VHost,
+ Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, Protocol, User, VHost,
- Capabilities, Collector}]),
+ [{direct, Number, ClientChannelPid, ConnectionPid, Protocol, User,
+ VHost, Capabilities, Collector}]),
{ok, ChannelPid}.