summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-03-30 14:17:15 +0100
committerSimon MacMullen <simon@rabbitmq.com>2015-03-30 14:17:15 +0100
commit4fdd61b9c68b911b7d8c35bed385fb2167f173fa (patch)
tree5504ab9764d308550f151a4223c7ad8f7a2069e8
parentc4c4dc00acf3375eee5dfdb572d27e0a91f75061 (diff)
parentbd4f8aec7edf5c295d24adf3edf0cebe3d6e8449 (diff)
downloadrabbitmq-server-git-4fdd61b9c68b911b7d8c35bed385fb2167f173fa.tar.gz
Merge branch 'rabbitmq-server-86' into stablerabbitmq_v3_5_1
-rw-r--r--src/rabbit_queue_consumers.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c60adb5b58..068f89f046 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -175,10 +175,11 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = BlockedQ} ->
- AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
+ Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
{[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
- tags(priority_queue:to_list(AllConsumers)),
+ tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -442,9 +443,12 @@ remove_consumer(ChPid, CTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
- (_) -> true
- end, Queue).
+ priority_queue:filter(chan_pred(ChPid, false), Queue).
+
+chan_pred(ChPid, Want) ->
+ fun ({CP, _Consumer}) when CP =:= ChPid -> Want;
+ (_) -> not Want
+ end.
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;