summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:17:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:17:33 +0000
commit10aec532ddb744e1576d3ef6b1a5af4810cd5efb (patch)
tree1e1c3e7536c45ec7dcf2228eff3602f5add31698
parentdf61129d41af4c90645e7f3153060bb25feb2f5c (diff)
downloadrabbitmq-server-git-10aec532ddb744e1576d3ef6b1a5af4810cd5efb.tar.gz
Notify on channel down.
-rw-r--r--src/rabbit_amqqueue_process.erl10
1 files changed, 10 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 886db8d1d3..94cfa61980 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -701,6 +701,12 @@ remove_consumers(ChPid, Queue, QName) ->
true
end, Queue).
+channel_consumers(ChPid, Queue) ->
+ priority_queue:fold(
+ fun ({CP, #consumer{tag = CTag}}, _, Acc) when CP =:= ChPid ->
+ [CTag | Acc]
+ end, [], Queue).
+
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found -> State;
@@ -760,6 +766,10 @@ handle_ch_down(DownPid, State = #q{active_consumers = AC,
end,
State2 = State1#q{active_consumers = AC1,
exclusive_consumer = Holder1},
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- channel_consumers(ChPid, AC)],
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- channel_consumers(ChPid, Blocked)],
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(queue:to_list(ChAckTags),