summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_consumers.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c60adb5b58..068f89f046 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -175,10 +175,11 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = BlockedQ} ->
- AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
+ Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
{[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
- tags(priority_queue:to_list(AllConsumers)),
+ tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -442,9 +443,12 @@ remove_consumer(ChPid, CTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
- (_) -> true
- end, Queue).
+ priority_queue:filter(chan_pred(ChPid, false), Queue).
+
+chan_pred(ChPid, Want) ->
+ fun ({CP, _Consumer}) when CP =:= ChPid -> Want;
+ (_) -> not Want
+ end.
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;