diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 886db8d1d3..94cfa61980 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -701,6 +701,12 @@ remove_consumers(ChPid, Queue, QName) -> true end, Queue). +channel_consumers(ChPid, Queue) -> + priority_queue:fold( + fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid -> + [CTag | Acc] + end, [], Queue). + possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; @@ -760,6 +766,10 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC, end, State2 = State1#q{active_consumers = AC1, exclusive_consumer = Holder1}, + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, AC)], + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, Blocked)], case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(queue:to_list(ChAckTags), |
