diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:17:33 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:17:33 +0000 |
| commit | 10aec532ddb744e1576d3ef6b1a5af4810cd5efb (patch) | |
| tree | 1e1c3e7536c45ec7dcf2228eff3602f5add31698 | |
| parent | df61129d41af4c90645e7f3153060bb25feb2f5c (diff) | |
| download | rabbitmq-server-git-10aec532ddb744e1576d3ef6b1a5af4810cd5efb.tar.gz | |
Notify on channel down.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 |
1 files changed, 10 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 886db8d1d3..94cfa61980 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -701,6 +701,12 @@ remove_consumers(ChPid, Queue, QName) -> true end, Queue). +channel_consumers(ChPid, Queue) -> + priority_queue:fold( + fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid -> + [CTag | Acc] + end, [], Queue). + possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; @@ -760,6 +766,10 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC, end, State2 = State1#q{active_consumers = AC1, exclusive_consumer = Holder1}, + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, AC)], + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- channel_consumers(ChPid, Blocked)], case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(queue:to_list(ChAckTags), |
