summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-03-30 14:17:28 +0100
committerSimon MacMullen <simon@rabbitmq.com>2015-03-30 14:17:28 +0100
commitcf4ad7d6bafe27ba9557bafbfbe6de1ff8317694 (patch)
tree38f184d4a9193997aac59470cc4d855724b594d7 /src
parent6ec20ab9fc3fdad7d0574d646d6caad4a8c2600a (diff)
parent4fdd61b9c68b911b7d8c35bed385fb2167f173fa (diff)
downloadrabbitmq-server-git-cf4ad7d6bafe27ba9557bafbfbe6de1ff8317694.tar.gz
Merge branch 'stable'
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_consumers.erl14
1 files changed, 9 insertions, 5 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c60adb5b58..068f89f046 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -175,10 +175,11 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = BlockedQ} ->
- AllConsumers = priority_queue:join(Consumers, BlockedQ),
+ All = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
+ Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
{[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
- tags(priority_queue:to_list(AllConsumers)),
+ tags(priority_queue:to_list(Filtered)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -442,9 +443,12 @@ remove_consumer(ChPid, CTag, Queue) ->
end, Queue).
remove_consumers(ChPid, Queue) ->
- priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
- (_) -> true
- end, Queue).
+ priority_queue:filter(chan_pred(ChPid, false), Queue).
+
+chan_pred(ChPid, Want) ->
+ fun ({CP, _Consumer}) when CP =:= ChPid -> Want;
+ (_) -> not Want
+ end.
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;