summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 11:20:30 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-13 11:20:30 +0100
commitab8f4ad3725b73e9a03476e291ac9b4e8ea16699 (patch)
tree9978cf2e24b8066c83a6e9d41b5e05f79b8b2aa8
parent31beb216a8a8cd2fc2bc21999c61be9c73394937 (diff)
downloadrabbitmq-server-git-ab8f4ad3725b73e9a03476e291ac9b4e8ea16699.tar.gz
remove unnecessary variable
-rw-r--r--src/rabbit_channel.erl91
1 files changed, 40 insertions, 51 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0676838088..4acda1563c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirm_queues, confirmed, capabilities, trace_state}).
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -198,7 +198,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
- confirm_queues = gb_sets:new(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -547,42 +546,38 @@ confirm(MsgSeqNos, QPid, State) ->
record_confirms(MXs, State1).
process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM,
- confirm_queues = CQs}) ->
- {MXs, UMQ1, UQM1, CQs1} =
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM, _CQs} = Acc) ->
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
{value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
Acc, Nack);
none -> Acc
end
- end, {[], UMQ, UQM, CQs}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1,
- confirm_queues = CQs1}}.
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM, CQs}, Nack) ->
- {UQM1, CQs2} = case gb_trees:lookup(QPid, UQM) of
- {value, {MRef, MsgSeqNos}} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> erlang:demonitor(MRef),
- {gb_trees:delete(QPid, UQM),
- gb_sets:del_element(QPid, CQs)};
- false -> {gb_trees:update(QPid, {MRef, MsgSeqNos1},
- UQM), CQs}
- end;
- none ->
- {UQM, CQs}
- end,
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, {MRef, MsgSeqNos}} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> erlang:demonitor(MRef),
+ gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM)
+ end;
+ none ->
+ UQM
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1, CQs2};
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1, CQs2}
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1143,15 +1138,13 @@ monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
State
end.
-monitor_confirm_queue(QPid, ConfirmQueues) ->
- case gb_sets:is_member(QPid, ConfirmQueues) of
- true -> {undefined, ConfirmQueues};
- false -> MRef = erlang:monitor(process, QPid),
- {MRef, gb_sets:insert(MRef, ConfirmQueues)}
+monitor_confirm_queue(QPid, UQM) ->
+ case gb_trees:is_defined(QPid, UQM) of
+ true -> undefined;
+ false -> erlang:monitor(process, QPid)
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM,
- confirm_queues = CQs}) ->
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, {_MRef, MsgSet}} -> gb_sets:to_list(MsgSet);
none -> []
@@ -1159,8 +1152,7 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM,
%% We remove the MsgSeqNos from UQM before calling
%% process_confirms to prevent each MsgSeqNo being removed from
%% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM),
- confirm_queues = gb_sets:del_element(QPid, CQs)},
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
case Reason of
Reason when Reason =:= noproc; Reason =:= noconnection;
@@ -1366,24 +1358,21 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM, confirm_queues = CQs} = State,
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- {UQM1, CQs1} = lists:foldl(
- fun (QPid, {UQM2, CQs2}) ->
- case gb_trees:lookup(QPid, UQM2) of
- {value, {MRef, MsgSeqNos}} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo,
- MsgSeqNos),
- {gb_trees:update(QPid, {MRef, MsgSeqNos1},
- UQM2), CQs2};
- none ->
- {MRef, CQs3} = monitor_confirm_queue(QPid, CQs2),
- {gb_trees:insert(QPid, {MRef, SingletonSet},
- UQM2), CQs3}
- end
- end, {UQM, CQs}, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1, confirm_queues = CQs1}.
+ UQM1 = lists:foldl(
+ fun (QPid, UQM2) ->
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, {MRef, MsgSeqNos}} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ gb_trees:update(QPid, {MRef, MsgSeqNos1}, UQM2);
+ none ->
+ MRef = monitor_confirm_queue(QPid, UQM2),
+ gb_trees:insert(QPid, {MRef, SingletonSet}, UQM2)
+ end
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};