summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-03-16 14:29:58 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-03-16 14:29:58 +0000
commit23d18d56c8a9ec6e12410038253c090b817ae293 (patch)
tree0051c6f5bf757b7b6dd7be629e2396e84243d71b
parent894201a627dae3aec33648d047bafa85d9cf9567 (diff)
downloadrabbitmq-server-git-23d18d56c8a9ec6e12410038253c090b817ae293.tar.gz
Say "ConnPid" everywhere.
-rw-r--r--src/rabbit_channel.erl64
-rw-r--r--src/rabbit_channel_sup.erl10
-rw-r--r--src/rabbit_direct.erl6
3 files changed, 40 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 370654a91a..2d2d9d6038 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -29,7 +29,7 @@
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2]).
--record(ch, {state, protocol, channel, reader_pid, writer_pid, connection_pid,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter_pid, start_limiter_fun, transaction_id, tx_participants,
next_tag, uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
@@ -96,10 +96,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost,
+start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun) ->
gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User,
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
VHost, Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
@@ -154,7 +154,7 @@ ready_for_close(Pid) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost,
+init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
Capabilities, CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -164,7 +164,7 @@ init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
- connection_pid = ConnectionPid,
+ conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
transaction_id = none,
@@ -362,15 +362,15 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- connection_pid = ConnectionPid}) ->
+send_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [ConnectionPid, Channel, Reason]),
+ [ConnPid, Channel, Reason]),
%% something bad's happened: rollback_and_notify may not be 'ok'
{_Result, State1} = rollback_and_notify(State),
case CloseChannel of
@@ -652,12 +652,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
- connection_pid = ConnectionPid,
- next_tag = DeliveryTag}) ->
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnectionPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
@@ -691,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{connection_pid = ConnectionPid,
+ _, State = #ch{conn_pid = ConnPid,
limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
@@ -708,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnectionPid,
+ QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
Q, NoAck, self(), LimiterPid,
@@ -923,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
- connection_pid = ConnectionPid,
+ conn_pid = ConnPid,
queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
- true -> ConnectionPid;
+ true -> ConnPid;
false -> none
end,
ActualNameBin = case QueueNameBin of
@@ -969,13 +969,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath,
- connection_pid = ConnectionPid}) ->
+ conn_pid = ConnPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
check_configure_permitted(QueueName, State),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ConnectionPid),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -983,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait},
- _, State = #ch{connection_pid = ConnectionPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnectionPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -1019,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{connection_pid = ConnectionPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnectionPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
@@ -1144,7 +1144,7 @@ handle_consuming_queue_down(MRef, ConsumerTag,
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
- connection_pid = ConnPid }) ->
+ conn_pid = ConnPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -1412,13 +1412,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _) -> self();
-i(connection, #ch{connection_pid = Connection}) -> Connection;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
-i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(pid, _) -> self();
+i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{user = User}) -> User#user.username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 7eec081808..65ccca0249 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -64,16 +64,16 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, AState} = rabbit_command_assembler:init(Protocol),
{ok, SupPid, {ChannelPid, AState}};
-start_link({direct, Channel, ClientChannelPid, ConnectionPid, Protocol, User,
- VHost, Capabilities, Collector}) ->
+start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost,
+ Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, ChannelPid} =
supervisor2:start_child(
SupPid,
{channel, {rabbit_channel, start_link,
- [Channel, ClientChannelPid, ClientChannelPid,
- ConnectionPid, Protocol, User, VHost, Capabilities,
- Collector, start_limiter_fun(SupPid)]},
+ [Channel, ClientChannelPid, ClientChannelPid, ConnPid,
+ Protocol, User, VHost, Capabilities, Collector,
+ start_limiter_fun(SupPid)]},
intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}),
{ok, SupPid, {ChannelPid, none}}.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 568cbea35b..0810c762c6 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) ->
{error, broker_not_found_on_node}
end.
-start_channel(Number, ClientChannelPid, ConnectionPid, Protocol, User, VHost,
+start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
supervisor2:start_child(
rabbit_direct_client_sup,
- [{direct, Number, ClientChannelPid, ConnectionPid, Protocol, User,
- VHost, Capabilities, Collector}]),
+ [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost,
+ Capabilities, Collector}]),
{ok, ChannelPid}.