diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-17 17:20:01 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-17 17:20:01 +0000 |
| commit | 47791dccd45c0182cdd2e28e4339e5c4e896d2cf (patch) | |
| tree | 8a752d7b41408d9714cfe13d10a31078cba2deeb /src | |
| parent | e1abf94150145a0654275584e0e59cf3bb185654 (diff) | |
| download | rabbitmq-server-git-47791dccd45c0182cdd2e28e4339e5c4e896d2cf.tar.gz | |
Tidy up on DOWN and emit basic.cancel
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 46 |
1 files changed, 32 insertions, 14 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 346ec371f9..1da8c95945 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -281,20 +281,15 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info(timeout, State) -> noreply(State); -handle_info({'DOWN', _MRef, process, QPid, Reason}, - State = #ch{unconfirmed = UC}) -> - %% TODO: this does a complete scan and partial rebuild of the - %% tree, which is quite efficient. To do better we'd need to - %% maintain a secondary mapping, from QPids to MsgSeqNos. - {MXs, UC1} = remove_queue_unconfirmed( - gb_trees:next(gb_trees:iterator(UC)), QPid, - {[], UC}, State), - erase_queue_stats(QPid), - State1 = case Reason of - normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) - end, - noreply(queue_blocked(QPid, State1)). +handle_info({'DOWN', MRef, process, QPid, Reason}, + State = #ch{consumer_monitors = ConsumerMonitors}) -> + noreply( + case dict:find(MRef, ConsumerMonitors) of + error -> + handle_non_consumer_down(QPid, Reason, State); + {ok, ConsumerTag} -> + handle_consumer_down(MRef, ConsumerTag, State) + end). handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), @@ -1061,6 +1056,29 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed = UC}) -> + %% TODO: this does a complete scan and partial rebuild of the + %% tree, which is quite efficient. To do better we'd need to + %% maintain a secondary mapping, from QPids to MsgSeqNos. + {MXs, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}, State), + erase_queue_stats(QPid), + State1 = case Reason of + normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); + _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) + end, + queue_blocked(QPid, State1). + +handle_consumer_down(MRef, ConsumerTag, + State = #ch{consumer_monitors = ConsumerMonitors, + writer_pid = WriterPid}) -> + ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors), + Cancel = #'basic.cancel'{consumer_tag = ConsumerTag, + nowait = true}, + ok = rabbit_writer:send_command(WriterPid, Cancel), + State#ch{consumer_monitors = ConsumerMonitors1}. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, |
