diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-03-16 14:29:58 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-03-16 14:29:58 +0000 |
| commit | 23d18d56c8a9ec6e12410038253c090b817ae293 (patch) | |
| tree | 0051c6f5bf757b7b6dd7be629e2396e84243d71b | |
| parent | 894201a627dae3aec33648d047bafa85d9cf9567 (diff) | |
| download | rabbitmq-server-git-23d18d56c8a9ec6e12410038253c090b817ae293.tar.gz | |
Say "ConnPid" everywhere.
| -rw-r--r-- | src/rabbit_channel.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 6 |
3 files changed, 40 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 370654a91a..2d2d9d6038 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -29,7 +29,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, protocol, channel, reader_pid, writer_pid, connection_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, @@ -96,10 +96,10 @@ %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost, +start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun) -> gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, + ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> @@ -154,7 +154,7 @@ ready_for_close(Pid) -> %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost, +init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), @@ -164,7 +164,7 @@ init([Channel, ReaderPid, WriterPid, ConnectionPid, Protocol, User, VHost, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, - connection_pid = ConnectionPid, + conn_pid = ConnPid, limiter_pid = undefined, start_limiter_fun = StartLimiterFun, transaction_id = none, @@ -362,15 +362,15 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - connection_pid = ConnectionPid}) -> +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid}) -> {CloseChannel, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", - [ConnectionPid, Channel, Reason]), + [ConnPid, Channel, Reason]), %% something bad's happened: rollback_and_notify may not be 'ok' {_Result, State1} = rollback_and_notify(State), case CloseChannel of @@ -652,12 +652,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, _, State = #ch{writer_pid = WriterPid, - connection_pid = ConnectionPid, - next_tag = DeliveryTag}) -> + conn_pid = ConnPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnectionPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, QPid, _MsgId, Redelivered, @@ -691,7 +691,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{connection_pid = ConnectionPid, + _, State = #ch{conn_pid = ConnPid, limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of @@ -708,7 +708,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, %% behalf. This is for symmetry with basic.cancel - see %% the comment in that method for why. case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnectionPid, + QueueName, ConnPid, fun (Q) -> {rabbit_amqqueue:basic_consume( Q, NoAck, self(), LimiterPid, @@ -923,10 +923,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait, arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, - connection_pid = ConnectionPid, + conn_pid = ConnPid, queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of - true -> ConnectionPid; + true -> ConnPid; false -> none end, ActualNameBin = case QueueNameBin of @@ -969,13 +969,13 @@ handle_method(#'queue.declare'{queue = QueueNameBin, passive = true, nowait = NoWait}, _, State = #ch{virtual_host = VHostPath, - connection_pid = ConnectionPid}) -> + conn_pid = ConnPid}) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), check_configure_permitted(QueueName, State), {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end), - ok = rabbit_amqqueue:check_exclusive_access(Q, ConnectionPid), + ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); @@ -983,11 +983,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, if_unused = IfUnused, if_empty = IfEmpty, nowait = NoWait}, - _, State = #ch{connection_pid = ConnectionPid}) -> + _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnectionPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> rabbit_misc:protocol_error( @@ -1019,11 +1019,11 @@ handle_method(#'queue.unbind'{queue = QueueNameBin, handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, - _, State = #ch{connection_pid = ConnectionPid}) -> + _, State = #ch{conn_pid = ConnPid}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnectionPid, + QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:purge(Q) end), return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); @@ -1144,7 +1144,7 @@ handle_consuming_queue_down(MRef, ConsumerTag, binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, - connection_pid = ConnPid }) -> + conn_pid = ConnPid }) -> %% FIXME: connection exception (!) on failure?? %% (see rule named "failure" in spec-XML) %% FIXME: don't allow binding to internal exchanges - @@ -1412,13 +1412,13 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. -i(pid, _) -> self(); -i(connection, #ch{connection_pid = Connection}) -> Connection; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; -i(vhost, #ch{virtual_host = VHost}) -> VHost; -i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; -i(confirm, #ch{confirm_enabled = CE}) -> CE; +i(pid, _) -> self(); +i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; +i(number, #ch{channel = Channel}) -> Channel; +i(user, #ch{user = User}) -> User#user.username; +i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none; +i(confirm, #ch{confirm_enabled = CE}) -> CE; i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) -> dict:size(ConsumerMapping); i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) -> diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 7eec081808..65ccca0249 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -64,16 +64,16 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, ConnectionPid, Protocol, User, - VHost, Capabilities, Collector}) -> +start_link({direct, Channel, ClientChannelPid, ConnPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, - ConnectionPid, Protocol, User, VHost, Capabilities, - Collector, start_limiter_fun(SupPid)]}, + [Channel, ClientChannelPid, ClientChannelPid, ConnPid, + Protocol, User, VHost, Capabilities, Collector, + start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 568cbea35b..0810c762c6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -69,11 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, ConnectionPid, Protocol, User, VHost, +start_channel(Number, ClientChannelPid, ConnPid, Protocol, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, ConnectionPid, Protocol, User, - VHost, Capabilities, Collector}]), + [{direct, Number, ClientChannelPid, ConnPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. |
