summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl46
1 files changed, 24 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c73d85ac05..6e57ff9fe3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -192,7 +192,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = gb_sets:new(),
- consumer_monitors = gb_sets:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
@@ -330,7 +330,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
State1 = handle_publishing_queue_down(QPid, Reason, State),
erase_queue_stats(QPid),
State2 = queue_blocked(QPid, State1),
- State3 = case gb_sets:is_member(QPid, ConsumerMonitors) of
+ State3 = case dict:is_key(QPid, ConsumerMonitors) of
false -> State2;
true -> handle_consuming_queue_down(QPid, State1)
end,
@@ -782,14 +782,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
{ok, Q = #amqqueue{pid = QPid}} ->
- ConsumerMapping1 = dict:erase(ConsumerTag,ConsumerMapping),
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
ConsumerMonitors1 =
- case dict:size(dict:filter(fun (_, #amqqueue{pid = QPid0})
- when QPid0 =:= QPid -> true;
- (_, _) -> false
- end, ConsumerMapping1)) of
- 0 -> gb_sets:delete(QPid, ConsumerMonitors);
- _ -> ConsumerMonitors
+ case dict:find(QPid, ConsumerMonitors) of
+ error -> ConsumerMonitors;
+ {ok, CTags} -> case gb_sets:size(CTags) of
+ 1 -> dict:erase(QPid, ConsumerMonitors);
+ _ -> dict:store(QPid, gb_sets:delete(ConsumerTag, CTags), ConsumerMonitors)
+ end
end,
State1 = State#ch{consumer_mapping = ConsumerMapping1,
consumer_monitors = ConsumerMonitors1},
@@ -1155,9 +1155,12 @@ consumer_monitor(ConsumerTag,
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} ->
#amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping),
- monitor_queue(QPid,
- State#ch{consumer_monitors =
- gb_sets:add(QPid, ConsumerMonitors)});
+ ConsumerMonitors1 =
+ dict:update(QPid,
+ fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) end,
+ gb_sets:singleton(ConsumerTag),
+ ConsumerMonitors),
+ monitor_queue(QPid, State#ch{consumer_monitors = ConsumerMonitors1});
_ ->
State
end.
@@ -1178,7 +1181,7 @@ queue_monitor_needed(QPid, #ch{stats_timer = StatsTimer,
blocking = Blocking,
unconfirmed_qm = UQM}) ->
StatsEnabled = rabbit_event:stats_level(StatsTimer) =:= fine,
- ConsumerMonitored = gb_sets:is_member(QPid, ConsumerMonitors),
+ ConsumerMonitored = dict:is_key(QPid, ConsumerMonitors),
QueueBlocked = gb_sets:is_element(QPid, Blocking),
ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
@@ -1209,19 +1212,18 @@ handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors,
writer_pid = WriterPid}) ->
- {ConsumerTags, ConsumerMapping1} =
- dict:fold(fun (CTag, #amqqueue{pid = QPid0}, {CTags, CMap})
- when QPid =:= QPid0 ->
- {[CTag | CTags], CMap};
- (CTag, Q, {CTags, CMap}) ->
- {CTags, dict:store(CTag, Q, CMap)}
- end, {[], dict:new()}, ConsumerMapping),
- ConsumerMonitors1 = gb_sets:delete(QPid, ConsumerMonitors),
+ ConsumerTags = case dict:find(QPid, ConsumerMonitors) of
+ error -> [];
+ {ok, CTags} -> CTags
+ end,
+ ConsumerMonitors1 = dict:erase(QPid, ConsumerMonitors),
+ ConsumerMapping1 = gb_sets:fold(fun (CTag, CMap) -> dict:erase(CTag, CMap) end,
+ ConsumerMapping, ConsumerTags),
[begin
Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
nowait = true},
ok = rabbit_writer:send_command(WriterPid, Cancel)
- end || ConsumerTag <- ConsumerTags],
+ end || ConsumerTag <- gb_sets:to_list(ConsumerTags)],
State#ch{consumer_mapping = ConsumerMapping1,
consumer_monitors = ConsumerMonitors1}.