diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 14 |
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c60adb5b58..068f89f046 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -175,10 +175,11 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = BlockedQ} -> - AllConsumers = priority_queue:join(Consumers, BlockedQ), + All = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), + Filtered = priority_queue:filter(chan_pred(ChPid, true), All), {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], - tags(priority_queue:to_list(AllConsumers)), + tags(priority_queue:to_list(Filtered)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -442,9 +443,12 @@ remove_consumer(ChPid, CTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; - (_) -> true - end, Queue). + priority_queue:filter(chan_pred(ChPid, false), Queue). + +chan_pred(ChPid, Want) -> + fun ({CP, _Consumer}) when CP =:= ChPid -> Want; + (_) -> not Want + end. update_use({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; |
